diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs index 7e471a2ebc..4d5523be74 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs @@ -14,7 +14,7 @@ use std::time::Duration; use async_trait::async_trait; use futures::channel::{mpsc, oneshot}; -use futures::StreamExt; +use futures::{SinkExt, StreamExt}; use papyrus_consensus::types::{ ConsensusContext, ConsensusError, @@ -23,7 +23,15 @@ use papyrus_consensus::types::{ ValidatorId, }; use papyrus_network::network_manager::{BroadcastTopicClient, BroadcastTopicClientTrait}; -use papyrus_protobuf::consensus::{ConsensusMessage, Proposal, ProposalInit, ProposalPart, Vote}; +use papyrus_protobuf::consensus::{ + ConsensusMessage, + Proposal, + ProposalFin, + ProposalInit, + ProposalPart, + TransactionBatch, + Vote, +}; use papyrus_storage::body::BodyStorageReader; use papyrus_storage::header::HeaderStorageReader; use papyrus_storage::{StorageError, StorageReader}; @@ -36,10 +44,12 @@ use tracing::{debug, debug_span, info, warn, Instrument}; type HeightToIdToContent = BTreeMap>>; +const CHANNEL_SIZE: usize = 100; + pub struct PapyrusConsensusContext { storage_reader: StorageReader, network_broadcast_client: BroadcastTopicClient, - _network_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver)>, + network_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver)>, validators: Vec, sync_broadcast_sender: Option>, // Proposal building/validating returns immediately, leaving the actual processing to a spawned @@ -52,14 +62,14 @@ impl PapyrusConsensusContext { pub fn new( storage_reader: StorageReader, network_broadcast_client: BroadcastTopicClient, - _network_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver)>, + network_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver)>, num_validators: u64, sync_broadcast_sender: Option>, ) -> Self { Self { storage_reader, network_broadcast_client, - _network_proposal_sender, + network_proposal_sender, validators: (0..num_validators).map(ContractAddress::from).collect(), sync_broadcast_sender, valid_proposals: Arc::new(Mutex::new(BTreeMap::new())), @@ -77,7 +87,7 @@ impl ConsensusContext for PapyrusConsensusContext { proposal_init: ProposalInit, _timeout: Duration, ) -> oneshot::Receiver { - let mut network_broadcast_sender = self.network_broadcast_client.clone(); + let mut proposal_sender_sender = self.network_proposal_sender.clone(); let (fin_sender, fin_receiver) = oneshot::channel(); let storage_reader = self.storage_reader.clone(); @@ -113,18 +123,27 @@ impl ConsensusContext for PapyrusConsensusContext { }) .block_hash; - let proposal = Proposal { - height: proposal_init.height.0, - round: proposal_init.round, - proposer: proposal_init.proposer, - transactions: transactions.clone(), - block_hash, - valid_round: proposal_init.valid_round, - }; - network_broadcast_sender - .broadcast_message(ConsensusMessage::Proposal(proposal)) + let (mut proposal_sender, proposal_receiver) = mpsc::channel(CHANNEL_SIZE); + let stream_id = proposal_init.height.0; + proposal_sender_sender + .send((stream_id, proposal_receiver)) + .await + .expect("Failed to send proposal receiver"); + proposal_sender + .send(Self::ProposalPart::Init(proposal_init.clone())) + .await + .expect("Failed to send proposal init"); + proposal_sender + .send(ProposalPart::Transactions(TransactionBatch { + transactions: transactions.clone(), + tx_hashes: vec![], + })) + .await + .expect("Failed to send transactions"); + proposal_sender + .send(ProposalPart::Fin(ProposalFin { proposal_content_id: block_hash })) .await - .expect("Failed to send proposal"); + .expect("Failed to send fin"); { let mut proposals = valid_proposals .lock() diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs index 136e29d678..1bc36ce22c 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs @@ -11,7 +11,7 @@ use std::time::Duration; use async_trait::async_trait; use futures::channel::{mpsc, oneshot}; -use futures::StreamExt; +use futures::{SinkExt, StreamExt}; use papyrus_consensus::types::{ ConsensusContext, ConsensusError, @@ -19,7 +19,7 @@ use papyrus_consensus::types::{ Round, ValidatorId, }; -use papyrus_network::network_manager::{BroadcastTopicClient, BroadcastTopicClientTrait}; +use papyrus_network::network_manager::BroadcastTopicClient; use papyrus_protobuf::consensus::{ ConsensusMessage, ProposalFin, @@ -54,6 +54,8 @@ type HeightToIdToContent = BTreeMap, ProposalId)>>; type ValidationParams = (BlockNumber, Duration, mpsc::Receiver>); +const CHANNEL_SIZE: usize = 100; + pub struct SequencerConsensusContext { batcher: Arc, validators: Vec, @@ -67,7 +69,6 @@ pub struct SequencerConsensusContext { proposal_id: u64, current_height: Option, current_round: Round, - network_broadcast_client: BroadcastTopicClient, // The active proposal refers to the proposal being validated at the current height/round. // Building proposals are not tracked as active, as consensus can't move on to the next // height/round until building is done. Context only works on proposals for the @@ -75,20 +76,21 @@ pub struct SequencerConsensusContext { active_proposal: Option<(Arc, JoinHandle<()>)>, // Stores proposals for future rounds until the round is reached. queued_proposals: BTreeMap)>, - _outbound_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver)>, + _network_broadcast_client: BroadcastTopicClient, + outbound_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver)>, } impl SequencerConsensusContext { pub fn new( batcher: Arc, - network_broadcast_client: BroadcastTopicClient, - _outbound_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver)>, + _network_broadcast_client: BroadcastTopicClient, + outbound_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver)>, num_validators: u64, ) -> Self { Self { batcher, - network_broadcast_client, - _outbound_proposal_sender, + _network_broadcast_client, + outbound_proposal_sender, validators: (0..num_validators).map(ValidatorId::from).collect(), valid_proposals: Arc::new(Mutex::new(HeightToIdToContent::new())), proposal_id: 0, @@ -145,11 +147,16 @@ impl ConsensusContext for SequencerConsensusContext { .await .expect("Failed to initiate proposal build"); debug!("Broadcasting proposal init: {proposal_init:?}"); - self.network_broadcast_client - .broadcast_message(ProposalPart::Init(proposal_init.clone())) + let (mut proposal_sender, proposal_receiver) = mpsc::channel(CHANNEL_SIZE); + let stream_id = proposal_init.height.0; + self.outbound_proposal_sender + .send((stream_id, proposal_receiver)) + .await + .expect("Failed to send proposal receiver"); + proposal_sender + .send(ProposalPart::Init(proposal_init.clone())) .await - .expect("Failed to broadcast proposal init"); - let broadcast_client = self.network_broadcast_client.clone(); + .expect("Failed to send proposal init"); tokio::spawn( async move { stream_build_proposal( @@ -157,7 +164,7 @@ impl ConsensusContext for SequencerConsensusContext { proposal_id, batcher, valid_proposals, - broadcast_client, + proposal_sender, fin_sender, ) .await; @@ -350,16 +357,16 @@ impl SequencerConsensusContext { // Handles building a new proposal without blocking consensus: // 1. Receive chunks of content from the batcher. -// 2. Forward these to consensus to be streamed out to the network. +// 2. Forward these to the stream handler to be streamed out to the network. // 3. Once finished, receive the commitment from the batcher. // 4. Store the proposal for re-proposal. -// 5. Send the commitment to consensus. +// 5. Send the commitment to the stream handler (to send fin). async fn stream_build_proposal( height: BlockNumber, proposal_id: ProposalId, batcher: Arc, valid_proposals: Arc>, - mut broadcast_client: BroadcastTopicClient, + mut proposal_sender: mpsc::Sender, fin_sender: oneshot::Sender, ) { let mut content = Vec::new(); @@ -386,8 +393,8 @@ async fn stream_build_proposal( } debug!("Broadcasting proposal content: {transaction_hashes:?}"); trace!("Broadcasting proposal content: {transactions:?}"); - broadcast_client - .broadcast_message(ProposalPart::Transactions(TransactionBatch { + proposal_sender + .send(ProposalPart::Transactions(TransactionBatch { transactions, tx_hashes: transaction_hashes, })) @@ -405,8 +412,8 @@ async fn stream_build_proposal( height ); debug!("Broadcasting proposal fin: {proposal_content_id:?}"); - broadcast_client - .broadcast_message(ProposalPart::Fin(ProposalFin { proposal_content_id })) + proposal_sender + .send(ProposalPart::Fin(ProposalFin { proposal_content_id })) .await .expect("Failed to broadcast proposal fin"); // Update valid_proposals before sending fin to avoid a race condition diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs index 980c57fbab..8f718df3a0 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs @@ -9,10 +9,11 @@ use papyrus_consensus::stream_handler::StreamHandler; use papyrus_consensus::types::ConsensusContext; use papyrus_network::network_manager::test_utils::{ mock_register_broadcast_topic, + BroadcastNetworkMock, TestSubscriberChannels, }; use papyrus_network::network_manager::BroadcastTopicChannels; -use papyrus_protobuf::consensus::{ProposalInit, ProposalPart}; +use papyrus_protobuf::consensus::{ProposalInit, ProposalPart, StreamMessage}; use starknet_api::block::{BlockHash, BlockNumber}; use starknet_api::core::{ContractAddress, StateDiffCommitment}; use starknet_api::executable_transaction::{AccountTransaction, Transaction}; @@ -52,10 +53,12 @@ fn generate_invoke_tx(tx_hash: Felt) -> Transaction { }))) } -fn make_streaming_channels() --> (mpsc::Sender<(u64, mpsc::Receiver)>, mpsc::Receiver>) -{ - let TestSubscriberChannels { mock_network: _mock_network, subscriber_channels } = +fn make_streaming_channels() -> ( + mpsc::Sender<(u64, mpsc::Receiver)>, + mpsc::Receiver>, + BroadcastNetworkMock>, +) { + let TestSubscriberChannels { mock_network, subscriber_channels } = mock_register_broadcast_topic().expect("Failed to create mock network"); let BroadcastTopicChannels { broadcasted_messages_receiver: inbound_network_receiver, @@ -63,7 +66,7 @@ fn make_streaming_channels() } = subscriber_channels; let (outbound_internal_sender, inbound_internal_receiver, _) = StreamHandler::get_channels(inbound_network_receiver, outbound_network_sender); - (outbound_internal_sender, inbound_internal_receiver) + (outbound_internal_sender, inbound_internal_receiver, mock_network) } #[tokio::test] @@ -99,7 +102,8 @@ async fn build_proposal() { let BroadcastTopicChannels { broadcasted_messages_receiver: _, broadcast_topic_client } = subscriber_channels; - let (outbound_internal_sender, _inbound_internal_receiver) = make_streaming_channels(); + let (outbound_internal_sender, _inbound_internal_receiver, _mock_network) = + make_streaming_channels(); let mut context = SequencerConsensusContext::new( Arc::new(batcher), @@ -160,7 +164,8 @@ async fn validate_proposal_success() { let BroadcastTopicChannels { broadcasted_messages_receiver: _, broadcast_topic_client } = subscriber_channels; - let (outbound_internal_sender, _inbound_internal_receiver) = make_streaming_channels(); + let (outbound_internal_sender, _inbound_internal_receiver, _mock_network) = + make_streaming_channels(); let mut context = SequencerConsensusContext::new( Arc::new(batcher), @@ -210,8 +215,8 @@ async fn repropose() { let BroadcastTopicChannels { broadcasted_messages_receiver: _, broadcast_topic_client } = subscriber_channels; - let (outbound_internal_sender, _inbound_internal_receiver, _) = - StreamHandler::get_channels(inbound_network_receiver, outbound_network_sender); + let (outbound_internal_sender, _inbound_internal_receiver, _mock_network) = + make_streaming_channels(); let mut context = SequencerConsensusContext::new( Arc::new(batcher), @@ -281,7 +286,8 @@ async fn proposals_from_different_rounds() { let BroadcastTopicChannels { broadcasted_messages_receiver: _, broadcast_topic_client } = subscriber_channels; - let (outbound_internal_sender, _inbound_internal_receiver) = make_streaming_channels(); + let (outbound_internal_sender, _inbound_internal_receiver, _mock_network) = + make_streaming_channels(); let mut context = SequencerConsensusContext::new( Arc::new(batcher), @@ -364,7 +370,8 @@ async fn interrupt_active_proposal() { let BroadcastTopicChannels { broadcasted_messages_receiver: _, broadcast_topic_client } = subscriber_channels; - let (outbound_internal_sender, _inbound_internal_receiver) = make_streaming_channels(); + let (outbound_internal_sender, _inbound_internal_receiver, _mock_network) = + make_streaming_channels(); let mut context = SequencerConsensusContext::new( Arc::new(batcher), diff --git a/crates/starknet_api/src/transaction.rs b/crates/starknet_api/src/transaction.rs index 480af09b08..02a97418f4 100644 --- a/crates/starknet_api/src/transaction.rs +++ b/crates/starknet_api/src/transaction.rs @@ -147,7 +147,7 @@ impl From<(Transaction, TransactionHash)> for crate::executable_transaction::Tra ), _ => { unimplemented!("Unsupported transaction type. Only Invoke is currently supported.") - }, + } } } } diff --git a/crates/starknet_integration_tests/src/flow_test_setup.rs b/crates/starknet_integration_tests/src/flow_test_setup.rs index 500746aa7f..0954233cac 100644 --- a/crates/starknet_integration_tests/src/flow_test_setup.rs +++ b/crates/starknet_integration_tests/src/flow_test_setup.rs @@ -2,7 +2,7 @@ use std::net::SocketAddr; use mempool_test_utils::starknet_api_test_utils::MultiAccountTransactionGenerator; use papyrus_network::network_manager::BroadcastTopicChannels; -use papyrus_protobuf::consensus::ProposalPart; +use papyrus_protobuf::consensus::{ProposalPart, StreamMessage}; use starknet_api::rpc_transaction::RpcTransaction; use starknet_api::transaction::TransactionHash; use starknet_gateway_types::errors::GatewaySpecError; @@ -33,7 +33,7 @@ pub struct FlowTestSetup { pub sequencer_node_handle: JoinHandle>, // Channels for consensus proposals, used for asserting the right transactions are proposed. - pub consensus_proposals_channels: BroadcastTopicChannels, + pub consensus_proposals_channels: BroadcastTopicChannels>, } impl FlowTestSetup { diff --git a/crates/starknet_integration_tests/src/utils.rs b/crates/starknet_integration_tests/src/utils.rs index ec5079a726..44728d293f 100644 --- a/crates/starknet_integration_tests/src/utils.rs +++ b/crates/starknet_integration_tests/src/utils.rs @@ -9,7 +9,7 @@ use mempool_test_utils::starknet_api_test_utils::{AccountId, MultiAccountTransac use papyrus_consensus::config::ConsensusConfig; use papyrus_network::network_manager::test_utils::create_network_configs_connected_to_broadcast_channels; use papyrus_network::network_manager::BroadcastTopicChannels; -use papyrus_protobuf::consensus::ProposalPart; +use papyrus_protobuf::consensus::{ProposalPart, StreamMessage}; use papyrus_storage::StorageConfig; use starknet_api::block::BlockNumber; use starknet_api::contract_address; @@ -42,7 +42,7 @@ pub async fn create_config( chain_info: ChainInfo, rpc_server_addr: SocketAddr, batcher_storage_config: StorageConfig, -) -> (SequencerNodeConfig, RequiredParams, BroadcastTopicChannels) { +) -> (SequencerNodeConfig, RequiredParams, BroadcastTopicChannels>) { let fee_token_addresses = chain_info.fee_token_addresses.clone(); let batcher_config = create_batcher_config(batcher_storage_config, chain_info.clone()); let gateway_config = create_gateway_config(chain_info.clone()).await; @@ -72,14 +72,16 @@ pub async fn create_config( fn create_consensus_manager_configs_and_channels( n_managers: usize, -) -> (Vec, BroadcastTopicChannels) { +) -> (Vec, BroadcastTopicChannels>) { let (network_configs, broadcast_channels) = create_network_configs_connected_to_broadcast_channels( n_managers, papyrus_network::gossipsub_impl::Topic::new( - starknet_consensus_manager::consensus_manager::NETWORK_TOPIC, + // TODO(guyn): return this to NETWORK_TOPIC once we have integrated streaming. + starknet_consensus_manager::consensus_manager::NETWORK_TOPIC2, ), ); + // TODO: Need to also add a channel for votes, in addition to the proposals channel. let consensus_manager_configs = network_configs .into_iter() diff --git a/crates/starknet_integration_tests/tests/end_to_end_flow_test.rs b/crates/starknet_integration_tests/tests/end_to_end_flow_test.rs index 367a97c138..8e2c998cea 100644 --- a/crates/starknet_integration_tests/tests/end_to_end_flow_test.rs +++ b/crates/starknet_integration_tests/tests/end_to_end_flow_test.rs @@ -3,7 +3,13 @@ use std::collections::HashSet; use futures::StreamExt; use mempool_test_utils::starknet_api_test_utils::MultiAccountTransactionGenerator; use papyrus_network::network_manager::BroadcastTopicChannels; -use papyrus_protobuf::consensus::{ProposalFin, ProposalInit, ProposalPart}; +use papyrus_protobuf::consensus::{ + ProposalFin, + ProposalInit, + ProposalPart, + StreamMessage, + StreamMessageBody, +}; use papyrus_storage::test_utils::CHAIN_ID_FOR_TESTS; use pretty_assertions::assert_eq; use rstest::{fixture, rstest}; @@ -48,13 +54,12 @@ async fn end_to_end(tx_generator: MultiAccountTransactionGenerator) { } async fn listen_to_broadcasted_messages( - consensus_proposals_channels: &mut BroadcastTopicChannels, + consensus_proposals_channels: &mut BroadcastTopicChannels>, expected_batched_tx_hashes: &[TransactionHash], ) { let chain_id = CHAIN_ID_FOR_TESTS.clone(); let broadcasted_messages_receiver = &mut consensus_proposals_channels.broadcasted_messages_receiver; - let mut received_tx_hashes = HashSet::new(); // TODO (Dan, Guy): retrieve / calculate the expected proposal init and fin. let expected_proposal_init = ProposalInit { height: BlockNumber(1), @@ -67,18 +72,32 @@ async fn listen_to_broadcasted_messages( "0x4597ceedbef644865917bf723184538ef70d43954d63f5b7d8cb9d1bd4c2c32", )), }; - assert_eq!( - broadcasted_messages_receiver.next().await.unwrap().0.unwrap(), - ProposalPart::Init(expected_proposal_init) - ); + + let StreamMessage { + stream_id: first_stream_id, + message: init_message, + message_id: incoming_message_id, + } = broadcasted_messages_receiver.next().await.unwrap().0.unwrap(); + + assert_eq!(incoming_message_id, 0); + let StreamMessageBody::Content(ProposalPart::Init(incoming_proposal_init)) = init_message + else { + panic!("Expected an init message. Got: {:?}", init_message) + }; + assert_eq!(incoming_proposal_init, expected_proposal_init); + + let mut received_tx_hashes = HashSet::new(); + let mut got_proposal_fin = false; + let mut got_channel_fin = false; loop { - match broadcasted_messages_receiver.next().await.unwrap().0.unwrap() { - ProposalPart::Init(init) => panic!("Unexpected init: {:?}", init), - ProposalPart::Fin(proposal_fin) => { - assert_eq!(proposal_fin, expected_proposal_fin); - break; + let StreamMessage { message, stream_id, message_id: _ } = + broadcasted_messages_receiver.next().await.unwrap().0.unwrap(); + assert_eq!(stream_id, first_stream_id); + match message { + StreamMessageBody::Content(ProposalPart::Init(init)) => { + panic!("Unexpected init: {:?}", init) } - ProposalPart::Transactions(transactions) => { + StreamMessageBody::Content(ProposalPart::Transactions(transactions)) => { received_tx_hashes.extend( transactions .transactions @@ -86,8 +105,22 @@ async fn listen_to_broadcasted_messages( .map(|tx| tx.calculate_transaction_hash(&chain_id).unwrap()), ); } + StreamMessageBody::Content(ProposalPart::Fin(proposal_fin)) => { + assert_eq!(proposal_fin, expected_proposal_fin); + got_proposal_fin = true; + } + StreamMessageBody::Fin => { + got_channel_fin = true; + } + } + if got_proposal_fin + && got_channel_fin + && received_tx_hashes.len() == expected_batched_tx_hashes.len() + { + break; } } + // Using HashSet to ignore the order of the transactions (broadcast can lead to reordering). assert_eq!( received_tx_hashes,