diff --git a/Cargo.lock b/Cargo.lock index e934462c589..861998b3939 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7535,6 +7535,7 @@ dependencies = [ "papyrus_monitoring_gateway", "papyrus_network", "papyrus_p2p_sync", + "papyrus_protobuf", "papyrus_rpc", "papyrus_storage", "papyrus_sync", diff --git a/crates/papyrus_node/Cargo.toml b/crates/papyrus_node/Cargo.toml index fc8c9b25e43..776373225ae 100644 --- a/crates/papyrus_node/Cargo.toml +++ b/crates/papyrus_node/Cargo.toml @@ -36,6 +36,7 @@ papyrus_consensus_orchestrator.workspace = true papyrus_monitoring_gateway.workspace = true papyrus_network.workspace = true papyrus_p2p_sync.workspace = true +papyrus_protobuf.workspace = true papyrus_rpc = { workspace = true, optional = true } papyrus_storage.workspace = true papyrus_sync.workspace = true diff --git a/crates/papyrus_node/src/run.rs b/crates/papyrus_node/src/run.rs index 2ddd3297990..7c1e9e55482 100644 --- a/crates/papyrus_node/src/run.rs +++ b/crates/papyrus_node/src/run.rs @@ -13,14 +13,16 @@ use papyrus_common::pending_classes::PendingClasses; use papyrus_config::presentation::get_config_presentation; use papyrus_config::validators::config_validate; use papyrus_consensus::config::ConsensusConfig; +use papyrus_consensus::stream_handler::StreamHandler; use papyrus_consensus_orchestrator::papyrus_consensus_context::PapyrusConsensusContext; use papyrus_monitoring_gateway::MonitoringServer; use papyrus_network::gossipsub_impl::Topic; -use papyrus_network::network_manager::NetworkManager; +use papyrus_network::network_manager::{BroadcastTopicChannels, NetworkManager}; use papyrus_network::{network_manager, NetworkConfig}; use papyrus_p2p_sync::client::{P2PSyncClient, P2PSyncClientChannels}; use papyrus_p2p_sync::server::{P2PSyncServer, P2PSyncServerChannels}; use papyrus_p2p_sync::{Protocol, BUFFER_SIZE}; +use papyrus_protobuf::consensus::{ProposalPart, StreamMessage}; #[cfg(feature = "rpc")] use papyrus_rpc::run_server; use papyrus_storage::{open_storage, update_storage_metrics, StorageReader, StorageWriter}; @@ -49,6 +51,8 @@ const DEFAULT_LEVEL: LevelFilter = LevelFilter::INFO; // different genesis hash. // TODO: Consider moving to a more general place. const GENESIS_HASH: &str = "0x0"; +// TODO(guyn): move this to the config. +pub const NETWORK_TOPIC: &str = "consensus_proposals"; // TODO(dvir): add this to config. // Duration between updates to the storage metrics (those in the collect_storage_metrics function). @@ -185,12 +189,25 @@ fn spawn_consensus( let network_channels = network_manager .register_broadcast_topic(Topic::new(config.network_topic.clone()), BUFFER_SIZE)?; + let proposal_network_channels: BroadcastTopicChannels> = + network_manager.register_broadcast_topic(Topic::new(NETWORK_TOPIC), BUFFER_SIZE)?; + let BroadcastTopicChannels { + broadcasted_messages_receiver: inbound_network_receiver, + broadcast_topic_client: outbound_network_sender, + } = proposal_network_channels; + + // TODO(Matan): receive the handle for the StreamHandler and pass it into run_consensus below. + let (outbound_internal_sender, inbound_internal_receiver, _) = + StreamHandler::get_channels(inbound_network_receiver, outbound_network_sender); + let context = PapyrusConsensusContext::new( storage_reader.clone(), network_channels.broadcast_topic_client.clone(), + outbound_internal_sender, config.num_validators, None, ); + Ok(tokio::spawn(async move { Ok(papyrus_consensus::run_consensus( context, @@ -199,6 +216,7 @@ fn spawn_consensus( config.consensus_delay, config.timeouts.clone(), network_channels.into(), + inbound_internal_receiver, futures::stream::pending(), ) .await?) diff --git a/crates/papyrus_protobuf/src/consensus.rs b/crates/papyrus_protobuf/src/consensus.rs index 894b035b5ea..ae2f2abebd4 100644 --- a/crates/papyrus_protobuf/src/consensus.rs +++ b/crates/papyrus_protobuf/src/consensus.rs @@ -6,6 +6,7 @@ use starknet_api::transaction::{Transaction, TransactionHash}; use crate::converters::ProtobufConversionError; +// TODO(guyn): remove this once we integrate ProposalPart everywhere. #[derive(Debug, Default, Hash, Clone, Eq, PartialEq)] pub struct Proposal { pub height: u64, @@ -34,7 +35,7 @@ pub struct Vote { #[derive(Debug, Clone, Hash, Eq, PartialEq)] pub enum ConsensusMessage { - Proposal(Proposal), + Proposal(Proposal), // To be deprecated Vote(Vote), } @@ -78,12 +79,12 @@ pub struct ProposalInit { pub struct TransactionBatch { /// The transactions in the batch. pub transactions: Vec, - // TODO(guyn): remove this once we settle how to convert transactions to ExecutableTransactions - /// The hashes of each transaction. + // TODO(guyn): remove this once we know how to get hashes as part of the compilation. + /// The transaction's hashes. pub tx_hashes: Vec, } -/// The propsal is done when receiving this fin message, which contains the block hash. +/// The proposal is done when receiving this fin message, which contains the block hash. #[derive(Debug, Clone, PartialEq)] pub struct ProposalFin { /// The block hash of the proposed block. @@ -102,6 +103,27 @@ pub enum ProposalPart { Fin(ProposalFin), } +impl TryInto for ProposalPart { + type Error = ProtobufConversionError; + + fn try_into(self: ProposalPart) -> Result { + match self { + ProposalPart::Init(init) => Ok(init), + _ => Err(ProtobufConversionError::WrongEnumVariant { + type_description: "ProposalPart", + expected: "Init", + value_as_str: format!("{:?}", self), + }), + } + } +} + +impl From for ProposalPart { + fn from(value: ProposalInit) -> Self { + ProposalPart::Init(value) + } +} + impl std::fmt::Display for StreamMessage where T: Clone + Into> + TryFrom, Error = ProtobufConversionError>, diff --git a/crates/papyrus_protobuf/src/converters/consensus.rs b/crates/papyrus_protobuf/src/converters/consensus.rs index 178c7c95892..708a9ee3b99 100644 --- a/crates/papyrus_protobuf/src/converters/consensus.rs +++ b/crates/papyrus_protobuf/src/converters/consensus.rs @@ -227,6 +227,8 @@ impl From for protobuf::ProposalInit { auto_impl_into_and_try_from_vec_u8!(ProposalInit, protobuf::ProposalInit); +// TODO(guyn): remove tx_hashes once we know how to compile the hashes +// when making the executable transactions. impl TryFrom for TransactionBatch { type Error = ProtobufConversionError; fn try_from(value: protobuf::TransactionBatch) -> Result { @@ -311,6 +313,7 @@ impl From for protobuf::ProposalPart { auto_impl_into_and_try_from_vec_u8!(ProposalPart, protobuf::ProposalPart); +// TODO(guyn): remove this once we are happy with how proposals are sent separate from votes. impl TryFrom for ConsensusMessage { type Error = ProtobufConversionError; diff --git a/crates/papyrus_protobuf/src/converters/mod.rs b/crates/papyrus_protobuf/src/converters/mod.rs index a72d774d25d..ed2fee8ba55 100644 --- a/crates/papyrus_protobuf/src/converters/mod.rs +++ b/crates/papyrus_protobuf/src/converters/mod.rs @@ -22,6 +22,12 @@ pub enum ProtobufConversionError { MissingField { field_description: &'static str }, #[error("Type `{type_description}` should be {num_expected} bytes but it got {value:?}.")] BytesDataLengthMismatch { type_description: &'static str, num_expected: usize, value: Vec }, + #[error("Type `{type_description}` got unexpected enum variant {value_as_str}")] + WrongEnumVariant { + type_description: &'static str, + value_as_str: String, + expected: &'static str, + }, #[error(transparent)] DecodeError(#[from] DecodeError), /// For CompressionError and serde_json::Error we put the string of the error instead of the diff --git a/crates/sequencing/papyrus_consensus/src/manager.rs b/crates/sequencing/papyrus_consensus/src/manager.rs index 5dfa5c7ad52..fe5ad992dac 100644 --- a/crates/sequencing/papyrus_consensus/src/manager.rs +++ b/crates/sequencing/papyrus_consensus/src/manager.rs @@ -37,13 +37,14 @@ pub async fn run_consensus( consensus_delay: Duration, timeouts: TimeoutsConfig, mut broadcast_channels: BroadcastConsensusMessageChannel, + mut inbound_proposal_receiver: mpsc::Receiver>, mut sync_receiver: SyncReceiverT, ) -> Result<(), ConsensusError> where ContextT: ConsensusContext, - SyncReceiverT: Stream + Unpin, ProposalWrapper: Into<(ProposalInit, mpsc::Receiver, oneshot::Receiver)>, + SyncReceiverT: Stream + Unpin, { info!( "Running consensus, start_height={}, validator_id={}, consensus_delay={}, timeouts={:?}", @@ -61,7 +62,12 @@ where loop { metrics::gauge!(PAPYRUS_CONSENSUS_HEIGHT, current_height.0 as f64); - let run_height = manager.run_height(&mut context, current_height, &mut broadcast_channels); + let run_height = manager.run_height( + &mut context, + current_height, + &mut broadcast_channels, + &mut inbound_proposal_receiver, + ); // `run_height` is not cancel safe. Our implementation doesn't enable us to start and stop // it. We also cannot restart the height; when we dropped the future we dropped the state it @@ -106,6 +112,7 @@ impl MultiHeightManager { context: &mut ContextT, height: BlockNumber, broadcast_channels: &mut BroadcastConsensusMessageChannel, + proposal_receiver: &mut mpsc::Receiver>, ) -> Result where ContextT: ConsensusContext, @@ -186,6 +193,7 @@ impl MultiHeightManager { match message { ConsensusMessage::Proposal(proposal) => { // Special case due to fake streaming. + // TODO(guyn): this will be gone once we integrate the proposal channels. let (proposal_init, content_receiver, fin_receiver) = ProposalWrapper(proposal).into(); let res = shc @@ -224,9 +232,7 @@ impl MultiHeightManager { async fn next_message( cached_messages: &mut Vec, broadcast_channels: &mut BroadcastConsensusMessageChannel, -) -> Result -where -{ +) -> Result { let BroadcastConsensusMessageChannel { broadcasted_messages_receiver, broadcast_topic_client } = broadcast_channels; if let Some(msg) = cached_messages.pop() { diff --git a/crates/sequencing/papyrus_consensus/src/manager_test.rs b/crates/sequencing/papyrus_consensus/src/manager_test.rs index 99f8a01087c..e4f03e65b03 100644 --- a/crates/sequencing/papyrus_consensus/src/manager_test.rs +++ b/crates/sequencing/papyrus_consensus/src/manager_test.rs @@ -13,7 +13,7 @@ use papyrus_network::network_manager::test_utils::{ TestSubscriberChannels, }; use papyrus_network_types::network_types::BroadcastedMessageMetadata; -use papyrus_protobuf::consensus::{ConsensusMessage, ProposalInit, Vote}; +use papyrus_protobuf::consensus::{ConsensusMessage, ProposalInit, ProposalPart, Vote}; use papyrus_test_utils::{get_rng, GetTestInstance}; use starknet_api::block::{BlockHash, BlockNumber}; use starknet_api::transaction::Transaction; @@ -32,12 +32,15 @@ lazy_static! { static ref TIMEOUTS: TimeoutsConfig = TimeoutsConfig::default(); } +const CHANNEL_SIZE: usize = 10; + mock! { pub TestContext {} #[async_trait] impl ConsensusContext for TestContext { type ProposalChunk = Transaction; + type ProposalPart = ProposalPart; async fn build_proposal( &mut self, @@ -87,6 +90,11 @@ async fn manager_multiple_heights_unordered() { let TestSubscriberChannels { mock_network, subscriber_channels } = mock_register_broadcast_topic().unwrap(); let mut sender = mock_network.broadcasted_messages_sender; + + // TODO(guyn): refactor this test to pass proposals through the correct channels. + let (mut _proposal_receiver_sender, mut proposal_receiver_receiver) = + mpsc::channel(CHANNEL_SIZE); + // Send messages for height 2 followed by those for height 1. send(&mut sender, proposal(Felt::TWO, 2, 0, *PROPOSER_ID)).await; send(&mut sender, prevote(Some(Felt::TWO), 2, 0, *PROPOSER_ID)).await; @@ -112,8 +120,15 @@ async fn manager_multiple_heights_unordered() { let mut manager = MultiHeightManager::new(*VALIDATOR_ID, TIMEOUTS.clone()); let mut subscriber_channels = subscriber_channels.into(); - let decision = - manager.run_height(&mut context, BlockNumber(1), &mut subscriber_channels).await.unwrap(); + let decision = manager + .run_height( + &mut context, + BlockNumber(1), + &mut subscriber_channels, + &mut proposal_receiver_receiver, + ) + .await + .unwrap(); assert_eq!(decision.block, BlockHash(Felt::ONE)); // Run the manager for height 2. @@ -125,8 +140,15 @@ async fn manager_multiple_heights_unordered() { block_receiver }) .times(1); - let decision = - manager.run_height(&mut context, BlockNumber(2), &mut subscriber_channels).await.unwrap(); + let decision = manager + .run_height( + &mut context, + BlockNumber(2), + &mut subscriber_channels, + &mut proposal_receiver_receiver, + ) + .await + .unwrap(); assert_eq!(decision.block, BlockHash(Felt::TWO)); } @@ -136,6 +158,9 @@ async fn run_consensus_sync() { let mut context = MockTestContext::new(); let (decision_tx, decision_rx) = oneshot::channel(); + // TODO(guyn): refactor this test to pass proposals through the correct channels. + let (mut _proposal_receiver_sender, proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE); + context.expect_validate_proposal().return_once(move |_, _, _, _| { let (block_sender, block_receiver) = oneshot::channel(); block_sender.send(BlockHash(Felt::TWO)).unwrap(); @@ -170,6 +195,7 @@ async fn run_consensus_sync() { Duration::ZERO, TIMEOUTS.clone(), subscriber_channels.into(), + proposal_receiver_receiver, &mut sync_receiver, ) .await @@ -196,6 +222,9 @@ async fn run_consensus_sync_cancellation_safety() { let (proposal_handled_tx, proposal_handled_rx) = oneshot::channel(); let (decision_tx, decision_rx) = oneshot::channel(); + // TODO(guyn): refactor this test to pass proposals through the correct channels. + let (mut _proposal_receiver_sender, proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE); + context.expect_validate_proposal().return_once(move |_, _, _, _| { let (block_sender, block_receiver) = oneshot::channel(); block_sender.send(BlockHash(Felt::ONE)).unwrap(); @@ -230,6 +259,7 @@ async fn run_consensus_sync_cancellation_safety() { Duration::ZERO, TIMEOUTS.clone(), subscriber_channels.into(), + proposal_receiver_receiver, &mut sync_receiver, ) .await @@ -260,6 +290,11 @@ async fn test_timeouts() { let TestSubscriberChannels { mock_network, subscriber_channels } = mock_register_broadcast_topic().unwrap(); let mut sender = mock_network.broadcasted_messages_sender; + + // TODO(guyn): refactor this test to pass proposals through the correct channels. + let (mut _proposal_receiver_sender, mut proposal_receiver_receiver) = + mpsc::channel(CHANNEL_SIZE); + send(&mut sender, proposal(Felt::ONE, 1, 0, *PROPOSER_ID)).await; send(&mut sender, prevote(None, 1, 0, *VALIDATOR_ID_2)).await; send(&mut sender, prevote(None, 1, 0, *VALIDATOR_ID_3)).await; @@ -293,7 +328,12 @@ async fn test_timeouts() { let mut manager = MultiHeightManager::new(*VALIDATOR_ID, TIMEOUTS.clone()); let manager_handle = tokio::spawn(async move { let decision = manager - .run_height(&mut context, BlockNumber(1), &mut subscriber_channels.into()) + .run_height( + &mut context, + BlockNumber(1), + &mut subscriber_channels.into(), + &mut proposal_receiver_receiver, + ) .await .unwrap(); assert_eq!(decision.block, BlockHash(Felt::ONE)); diff --git a/crates/sequencing/papyrus_consensus/src/stream_handler_test.rs b/crates/sequencing/papyrus_consensus/src/stream_handler_test.rs index 0bd7250f122..4bd575da8df 100644 --- a/crates/sequencing/papyrus_consensus/src/stream_handler_test.rs +++ b/crates/sequencing/papyrus_consensus/src/stream_handler_test.rs @@ -83,7 +83,7 @@ mod tests { mock_register_broadcast_topic().unwrap(); let network_sender_to_inbound = mock_network.broadcasted_messages_sender; - // The inbound_receiver is given to StreamHandler to inbound to mock network messages. + // The inbound_receiver is given to StreamHandler to mock network messages. let BroadcastTopicChannels { broadcasted_messages_receiver: inbound_receiver, broadcast_topic_client: _, diff --git a/crates/sequencing/papyrus_consensus/src/test_utils.rs b/crates/sequencing/papyrus_consensus/src/test_utils.rs index 6c91eae0450..37e51af60d1 100644 --- a/crates/sequencing/papyrus_consensus/src/test_utils.rs +++ b/crates/sequencing/papyrus_consensus/src/test_utils.rs @@ -3,7 +3,14 @@ use std::time::Duration; use async_trait::async_trait; use futures::channel::{mpsc, oneshot}; use mockall::mock; -use papyrus_protobuf::consensus::{ConsensusMessage, Proposal, ProposalInit, Vote, VoteType}; +use papyrus_protobuf::consensus::{ + ConsensusMessage, + Proposal, + ProposalInit, + ProposalPart, + Vote, + VoteType, +}; use starknet_api::block::{BlockHash, BlockNumber}; use starknet_types_core::felt::Felt; @@ -23,6 +30,7 @@ mock! { #[async_trait] impl ConsensusContext for TestContext { type ProposalChunk = u32; + type ProposalPart = ProposalPart; async fn build_proposal( &mut self, diff --git a/crates/sequencing/papyrus_consensus/src/types.rs b/crates/sequencing/papyrus_consensus/src/types.rs index 2082b23c2c3..e1b8ce69362 100644 --- a/crates/sequencing/papyrus_consensus/src/types.rs +++ b/crates/sequencing/papyrus_consensus/src/types.rs @@ -29,7 +29,13 @@ pub trait ConsensusContext { /// The chunks of content returned when iterating the proposal. // In practice I expect this to match the type sent to the network // (papyrus_protobuf::ConsensusMessage), and not to be specific to just the block's content. - type ProposalChunk; + type ProposalChunk; // TODO(guyn): deprecate this (and replace by ProposalPart) + type ProposalPart: TryFrom, Error = ProtobufConversionError> + + Into> + + TryInto + + From + + Clone + + Send; // TODO(matan): The oneshot for receiving the build block could be generalized to just be some // future which returns a block. diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs index 2fc426d8ec6..7e471a2ebc6 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs @@ -23,7 +23,7 @@ use papyrus_consensus::types::{ ValidatorId, }; use papyrus_network::network_manager::{BroadcastTopicClient, BroadcastTopicClientTrait}; -use papyrus_protobuf::consensus::{ConsensusMessage, Proposal, ProposalInit, Vote}; +use papyrus_protobuf::consensus::{ConsensusMessage, Proposal, ProposalInit, ProposalPart, Vote}; use papyrus_storage::body::BodyStorageReader; use papyrus_storage::header::HeaderStorageReader; use papyrus_storage::{StorageError, StorageReader}; @@ -39,6 +39,7 @@ type HeightToIdToContent = BTreeMap, + _network_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver)>, validators: Vec, sync_broadcast_sender: Option>, // Proposal building/validating returns immediately, leaving the actual processing to a spawned @@ -51,12 +52,14 @@ impl PapyrusConsensusContext { pub fn new( storage_reader: StorageReader, network_broadcast_client: BroadcastTopicClient, + _network_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver)>, num_validators: u64, sync_broadcast_sender: Option>, ) -> Self { Self { storage_reader, network_broadcast_client, + _network_proposal_sender, validators: (0..num_validators).map(ContractAddress::from).collect(), sync_broadcast_sender, valid_proposals: Arc::new(Mutex::new(BTreeMap::new())), @@ -67,10 +70,11 @@ impl PapyrusConsensusContext { #[async_trait] impl ConsensusContext for PapyrusConsensusContext { type ProposalChunk = Transaction; + type ProposalPart = ProposalPart; async fn build_proposal( &mut self, - init: ProposalInit, + proposal_init: ProposalInit, _timeout: Duration, ) -> oneshot::Receiver { let mut network_broadcast_sender = self.network_broadcast_client.clone(); @@ -83,39 +87,39 @@ impl ConsensusContext for PapyrusConsensusContext { // TODO(dvir): consider fix this for the case of reverts. If between the check that // the block in storage and to getting the transaction was a revert // this flow will fail. - wait_for_block(&storage_reader, init.height) + wait_for_block(&storage_reader, proposal_init.height) .await .expect("Failed to wait to block"); let txn = storage_reader.begin_ro_txn().expect("Failed to begin ro txn"); let transactions = txn - .get_block_transactions(init.height) + .get_block_transactions(proposal_init.height) .expect("Get transactions from storage failed") .unwrap_or_else(|| { panic!( "Block in {} was not found in storage despite waiting for it", - init.height + proposal_init.height ) }); let block_hash = txn - .get_block_header(init.height) + .get_block_header(proposal_init.height) .expect("Get header from storage failed") .unwrap_or_else(|| { panic!( "Block in {} was not found in storage despite waiting for it", - init.height + proposal_init.height ) }) .block_hash; let proposal = Proposal { - height: init.height.0, - round: init.round, - proposer: init.proposer, + height: proposal_init.height.0, + round: proposal_init.round, + proposer: proposal_init.proposer, transactions: transactions.clone(), block_hash, - valid_round: init.valid_round, + valid_round: proposal_init.valid_round, }; network_broadcast_sender .broadcast_message(ConsensusMessage::Proposal(proposal)) @@ -125,7 +129,10 @@ impl ConsensusContext for PapyrusConsensusContext { let mut proposals = valid_proposals .lock() .expect("Lock on active proposals was poisoned due to a previous panic"); - proposals.entry(init.height).or_default().insert(block_hash, transactions); + proposals + .entry(proposal_init.height) + .or_default() + .insert(block_hash, transactions); } // Done after inserting the proposal into the map to avoid race conditions between // insertion and calls to `repropose`. diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context_test.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context_test.rs index fdcd4c3ad03..25712fe2a70 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context_test.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context_test.rs @@ -2,12 +2,21 @@ use std::time::Duration; use futures::channel::{mpsc, oneshot}; use futures::StreamExt; +use papyrus_consensus::stream_handler::StreamHandler; use papyrus_consensus::types::ConsensusContext; use papyrus_network::network_manager::test_utils::{ mock_register_broadcast_topic, BroadcastNetworkMock, + TestSubscriberChannels, +}; +use papyrus_network::network_manager::BroadcastTopicChannels; +use papyrus_protobuf::consensus::{ + ConsensusMessage, + ProposalInit, + ProposalPart, + StreamMessage, + Vote, }; -use papyrus_protobuf::consensus::{ConsensusMessage, ProposalInit, Vote}; use papyrus_storage::body::BodyStorageWriter; use papyrus_storage::header::HeaderStorageWriter; use papyrus_storage::test_utils::get_test_storage; @@ -107,10 +116,21 @@ fn test_setup() -> ( .unwrap(); let network_channels = mock_register_broadcast_topic().unwrap(); + let network_proposal_channels: TestSubscriberChannels> = + mock_register_broadcast_topic().unwrap(); + let BroadcastTopicChannels { + broadcasted_messages_receiver: inbound_network_receiver, + broadcast_topic_client: outbound_network_sender, + } = network_proposal_channels.subscriber_channels; + let (outbound_internal_sender, _inbound_internal_receiver, _) = + StreamHandler::get_channels(inbound_network_receiver, outbound_network_sender); + let sync_channels = mock_register_broadcast_topic().unwrap(); + let papyrus_context = PapyrusConsensusContext::new( storage_reader.clone(), network_channels.subscriber_channels.broadcast_topic_client, + outbound_internal_sender, 4, Some(sync_channels.subscriber_channels.broadcast_topic_client), ); diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs index e6715231bcd..136e29d678f 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs @@ -75,17 +75,20 @@ pub struct SequencerConsensusContext { active_proposal: Option<(Arc, JoinHandle<()>)>, // Stores proposals for future rounds until the round is reached. queued_proposals: BTreeMap)>, + _outbound_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver)>, } impl SequencerConsensusContext { pub fn new( batcher: Arc, network_broadcast_client: BroadcastTopicClient, + _outbound_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver)>, num_validators: u64, ) -> Self { Self { batcher, network_broadcast_client, + _outbound_proposal_sender, validators: (0..num_validators).map(ValidatorId::from).collect(), valid_proposals: Arc::new(Mutex::new(HeightToIdToContent::new())), proposal_id: 0, @@ -99,8 +102,9 @@ impl SequencerConsensusContext { #[async_trait] impl ConsensusContext for SequencerConsensusContext { - // TODO: Switch to ProposalPart when Guy merges the PR. + // TODO(guyn): Switch to ProposalPart when done with the streaming integration. type ProposalChunk = Vec; + type ProposalPart = ProposalPart; async fn build_proposal( &mut self, diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs index 58c89780221..fdf5825c614 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs @@ -5,13 +5,14 @@ use std::vec; use futures::channel::mpsc; use futures::{FutureExt, SinkExt}; use lazy_static::lazy_static; +use papyrus_consensus::stream_handler::StreamHandler; use papyrus_consensus::types::ConsensusContext; use papyrus_network::network_manager::test_utils::{ mock_register_broadcast_topic, TestSubscriberChannels, }; use papyrus_network::network_manager::BroadcastTopicChannels; -use papyrus_protobuf::consensus::ProposalInit; +use papyrus_protobuf::consensus::{ProposalInit, ProposalPart}; use starknet_api::block::{BlockHash, BlockNumber}; use starknet_api::core::{ContractAddress, StateDiffCommitment}; use starknet_api::executable_transaction::{AccountTransaction, Transaction}; @@ -51,6 +52,20 @@ fn generate_invoke_tx(tx_hash: Felt) -> Transaction { }))) } +fn make_streaming_channels() +-> (mpsc::Sender<(u64, mpsc::Receiver)>, mpsc::Receiver>) +{ + let TestSubscriberChannels { mock_network: _mock_network, subscriber_channels } = + mock_register_broadcast_topic().expect("Failed to create mock network"); + let BroadcastTopicChannels { + broadcasted_messages_receiver: inbound_network_receiver, + broadcast_topic_client: outbound_network_sender, + } = subscriber_channels; + let (outbound_internal_sender, inbound_internal_receiver, _) = + StreamHandler::get_channels(inbound_network_receiver, outbound_network_sender); + (outbound_internal_sender, inbound_internal_receiver) +} + #[tokio::test] async fn build_proposal() { let mut batcher = MockBatcherClient::new(); @@ -78,12 +93,20 @@ async fn build_proposal() { }), }) }); + // TODO(guyn): remove this first set of channels once we are using only the streaming channels. let TestSubscriberChannels { mock_network: _mock_network, subscriber_channels } = mock_register_broadcast_topic().expect("Failed to create mock network"); let BroadcastTopicChannels { broadcasted_messages_receiver: _, broadcast_topic_client } = subscriber_channels; - let mut context = - SequencerConsensusContext::new(Arc::new(batcher), broadcast_topic_client, NUM_VALIDATORS); + + let (outbound_internal_sender, _inbound_internal_receiver) = make_streaming_channels(); + + let mut context = SequencerConsensusContext::new( + Arc::new(batcher), + broadcast_topic_client, + outbound_internal_sender, + NUM_VALIDATORS, + ); let init = ProposalInit { height: BlockNumber(0), round: 0, @@ -131,14 +154,23 @@ async fn validate_proposal_success() { }) }, ); + // TODO(guyn): remove this first set of channels once we are using only the streaming channels. let TestSubscriberChannels { mock_network: _, subscriber_channels } = mock_register_broadcast_topic().expect("Failed to create mock network"); let BroadcastTopicChannels { broadcasted_messages_receiver: _, broadcast_topic_client } = subscriber_channels; - let mut context = - SequencerConsensusContext::new(Arc::new(batcher), broadcast_topic_client, NUM_VALIDATORS); + + let (outbound_internal_sender, _inbound_internal_receiver) = make_streaming_channels(); + + let mut context = SequencerConsensusContext::new( + Arc::new(batcher), + broadcast_topic_client, + outbound_internal_sender, + NUM_VALIDATORS, + ); // Initialize the context for a specific height, starting with round 0. context.set_height_and_round(BlockNumber(0), 0).await; + let (mut content_sender, content_receiver) = mpsc::channel(CHANNEL_SIZE); content_sender.send(TX_BATCH.clone()).await.unwrap(); let fin_receiver = @@ -172,12 +204,20 @@ async fn repropose() { }) }, ); + // TODO(guyn): remove this first set of channels once we are using only the streaming channels. let TestSubscriberChannels { mock_network: _, subscriber_channels } = mock_register_broadcast_topic().expect("Failed to create mock network"); let BroadcastTopicChannels { broadcasted_messages_receiver: _, broadcast_topic_client } = subscriber_channels; - let mut context = - SequencerConsensusContext::new(Arc::new(batcher), broadcast_topic_client, NUM_VALIDATORS); + + let (outbound_internal_sender, _inbound_internal_receiver) = make_streaming_channels(); + + let mut context = SequencerConsensusContext::new( + Arc::new(batcher), + broadcast_topic_client, + outbound_internal_sender, + NUM_VALIDATORS, + ); // Initialize the context for a specific height, starting with round 0. context.set_height_and_round(BlockNumber(0), 0).await; @@ -239,8 +279,15 @@ async fn proposals_from_different_rounds() { mock_register_broadcast_topic().expect("Failed to create mock network"); let BroadcastTopicChannels { broadcasted_messages_receiver: _, broadcast_topic_client } = subscriber_channels; - let mut context = - SequencerConsensusContext::new(Arc::new(batcher), broadcast_topic_client, NUM_VALIDATORS); + + let (outbound_internal_sender, _inbound_internal_receiver) = make_streaming_channels(); + + let mut context = SequencerConsensusContext::new( + Arc::new(batcher), + broadcast_topic_client, + outbound_internal_sender, + NUM_VALIDATORS, + ); // Initialize the context for a specific height, starting with round 0. context.set_height_and_round(BlockNumber(0), 0).await; context.set_height_and_round(BlockNumber(0), 1).await; @@ -315,8 +362,15 @@ async fn interrupt_active_proposal() { mock_register_broadcast_topic().expect("Failed to create mock network"); let BroadcastTopicChannels { broadcasted_messages_receiver: _, broadcast_topic_client } = subscriber_channels; - let mut context = - SequencerConsensusContext::new(Arc::new(batcher), broadcast_topic_client, NUM_VALIDATORS); + + let (outbound_internal_sender, _inbound_internal_receiver) = make_streaming_channels(); + + let mut context = SequencerConsensusContext::new( + Arc::new(batcher), + broadcast_topic_client, + outbound_internal_sender, + NUM_VALIDATORS, + ); // Initialize the context for a specific height, starting with round 0. context.set_height_and_round(BlockNumber(0), 0).await; diff --git a/crates/starknet_consensus_manager/src/consensus_manager.rs b/crates/starknet_consensus_manager/src/consensus_manager.rs index 1ee091bff29..714c07bad5f 100644 --- a/crates/starknet_consensus_manager/src/consensus_manager.rs +++ b/crates/starknet_consensus_manager/src/consensus_manager.rs @@ -6,12 +6,17 @@ use futures::channel::mpsc::{self, SendError}; use futures::future::Ready; use futures::SinkExt; use libp2p::PeerId; +use papyrus_consensus::stream_handler::StreamHandler; use papyrus_consensus::types::{BroadcastConsensusMessageChannel, ConsensusError}; use papyrus_consensus_orchestrator::sequencer_consensus_context::SequencerConsensusContext; use papyrus_network::gossipsub_impl::Topic; -use papyrus_network::network_manager::{BroadcastTopicClient, NetworkManager}; +use papyrus_network::network_manager::{ + BroadcastTopicChannels, + BroadcastTopicClient, + NetworkManager, +}; use papyrus_network_types::network_types::BroadcastedMessageMetadata; -use papyrus_protobuf::consensus::{ConsensusMessage, ProposalPart}; +use papyrus_protobuf::consensus::{ConsensusMessage, ProposalPart, StreamMessage}; use starknet_batcher_types::communication::SharedBatcherClient; use starknet_sequencer_infra::component_definitions::ComponentStarter; use starknet_sequencer_infra::errors::ComponentError; @@ -22,6 +27,8 @@ use crate::config::ConsensusManagerConfig; // TODO(Dan, Guy): move to config. pub const BROADCAST_BUFFER_SIZE: usize = 100; pub const NETWORK_TOPIC: &str = "consensus_proposals"; +// TODO(guyn): remove this once we have integrated streaming. +pub const NETWORK_TOPIC2: &str = "streamed_consensus_proposals"; #[derive(Clone)] pub struct ConsensusManager { @@ -37,15 +44,33 @@ impl ConsensusManager { pub async fn run(&self) -> Result<(), ConsensusError> { let mut network_manager = NetworkManager::new(self.config.consensus_config.network_config.clone(), None); - let proposals_broadcast_channels = network_manager + + // TODO(guyn): remove this channel once we have integrated streaming. + let old_proposals_broadcast_channels = network_manager .register_broadcast_topic::( Topic::new(NETWORK_TOPIC), BROADCAST_BUFFER_SIZE, ) .expect("Failed to register broadcast topic"); + + let proposals_broadcast_channels = network_manager + .register_broadcast_topic::>( + Topic::new(NETWORK_TOPIC2), + BROADCAST_BUFFER_SIZE, + ) + .expect("Failed to register broadcast topic"); + let BroadcastTopicChannels { + broadcasted_messages_receiver: inbound_network_receiver, + broadcast_topic_client: outbound_network_sender, + } = proposals_broadcast_channels; + + let (outbound_internal_sender, inbound_internal_receiver, mut stream_handler_task_handle) = + StreamHandler::get_channels(inbound_network_receiver, outbound_network_sender); + let context = SequencerConsensusContext::new( Arc::clone(&self.batcher_client), - proposals_broadcast_channels.broadcast_topic_client.clone(), + old_proposals_broadcast_channels.broadcast_topic_client.clone(), + outbound_internal_sender, self.config.consensus_config.num_validators, ); @@ -57,6 +82,7 @@ impl ConsensusManager { self.config.consensus_config.consensus_delay, self.config.consensus_config.timeouts.clone(), create_fake_network_channels(), + inbound_internal_receiver, futures::stream::pending(), ); @@ -70,6 +96,9 @@ impl ConsensusManager { network_result = &mut network_handle => { panic!("Consensus' network task finished unexpectedly: {:?}", network_result); } + stream_handler_result = &mut stream_handler_task_handle => { + panic!("Consensus' stream handler task finished unexpectedly: {:?}", stream_handler_result); + } } } }