From 18d710cc4616f59b55b8d48121bb2b091b4737ac Mon Sep 17 00:00:00 2001 From: Matan Markind Date: Mon, 12 Aug 2024 13:25:26 +0300 Subject: [PATCH] refactor(consensus): consensus handles its network registrations on its own This is because the network registration logic will become more complex with test sync. In practice this means that after the initial network config we hold off on spawning the task until all tasks have been set up. --- Cargo.lock | 1 - crates/papyrus_node/Cargo.toml | 1 - crates/papyrus_node/src/main.rs | 125 ++++++++---------- .../papyrus_consensus/src/manager.rs | 2 + .../src/papyrus_consensus_context.rs | 9 +- 5 files changed, 61 insertions(+), 77 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5dec8d9a29..415a224096 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6330,7 +6330,6 @@ dependencies = [ "papyrus_monitoring_gateway", "papyrus_network", "papyrus_p2p_sync", - "papyrus_protobuf", "papyrus_rpc", "papyrus_storage", "papyrus_sync", diff --git a/crates/papyrus_node/Cargo.toml b/crates/papyrus_node/Cargo.toml index e160c76d7e..5b58eb9a99 100644 --- a/crates/papyrus_node/Cargo.toml +++ b/crates/papyrus_node/Cargo.toml @@ -32,7 +32,6 @@ papyrus_consensus.workspace = true papyrus_monitoring_gateway.workspace = true papyrus_network.workspace = true papyrus_p2p_sync.workspace = true -papyrus_protobuf.workspace = true papyrus_rpc = { workspace = true, optional = true } papyrus_storage.workspace = true papyrus_sync.workspace = true diff --git a/crates/papyrus_node/src/main.rs b/crates/papyrus_node/src/main.rs index 3817b2b997..eb78a3c613 100644 --- a/crates/papyrus_node/src/main.rs +++ b/crates/papyrus_node/src/main.rs @@ -7,8 +7,6 @@ use std::process::exit; use std::sync::Arc; use std::time::Duration; -use futures::channel::mpsc; -use futures::future::BoxFuture; use futures::FutureExt; use papyrus_base_layer::ethereum_base_layer_contract::EthereumBaseLayerConfig; use papyrus_common::metrics::COLLECT_PROFILING_METRICS; @@ -23,7 +21,7 @@ use papyrus_consensus::simulation_network_receiver::NetworkReceiver; use papyrus_consensus::types::ConsensusError; use papyrus_monitoring_gateway::MonitoringServer; use papyrus_network::gossipsub_impl::Topic; -use papyrus_network::network_manager::{BroadcastSubscriberChannels, NetworkError}; +use papyrus_network::network_manager::NetworkManager; use papyrus_network::{network_manager, NetworkConfig}; use papyrus_node::config::NodeConfig; use papyrus_node::version::VERSION_FULL; @@ -35,7 +33,6 @@ use papyrus_p2p_sync::client::{ }; use papyrus_p2p_sync::server::{P2PSyncServer, P2PSyncServerChannels}; use papyrus_p2p_sync::{Protocol, BUFFER_SIZE}; -use papyrus_protobuf::consensus::ConsensusMessage; #[cfg(feature = "rpc")] use papyrus_rpc::run_server; use papyrus_storage::{open_storage, update_storage_metrics, StorageReader, StorageWriter}; @@ -98,46 +95,48 @@ async fn create_rpc_server_future( } fn run_consensus( - config: ConsensusConfig, + config: Option<&ConsensusConfig>, storage_reader: StorageReader, - consensus_channels: BroadcastSubscriberChannels, + network_manager: Option<&mut NetworkManager>, ) -> anyhow::Result>> { - let validator_id = config.validator_id; - info!("Running consensus as validator {validator_id}"); + let (Some(config), Some(network_manager)) = (config, network_manager) else { + info!("Consensus is disabled."); + return Ok(tokio::spawn(pending())); + }; + + let network_channels = network_manager + .register_broadcast_topic(Topic::new(config.network_topic.clone()), BUFFER_SIZE)?; let context = PapyrusConsensusContext::new( storage_reader.clone(), - consensus_channels.messages_to_broadcast_sender, + network_channels.messages_to_broadcast_sender, config.num_validators, ); - let start_height = config.start_height; // TODO(matan): connect this to an actual channel. - let (_, sync_rx) = mpsc::channel(1); - match config.test { - Some(test_config) => { - let network_receiver = NetworkReceiver::new( - consensus_channels.broadcasted_messages_receiver, - test_config.cache_size, - test_config.random_seed, - test_config.drop_probability, - test_config.invalid_probability, - ); - Ok(tokio::spawn(papyrus_consensus::run_consensus( - context, - start_height, - validator_id, - config.consensus_delay, - network_receiver, - sync_rx, - ))) - } - None => Ok(tokio::spawn(papyrus_consensus::run_consensus( + if let Some(test_config) = config.test.as_ref() { + let network_receiver = NetworkReceiver::new( + network_channels.broadcasted_messages_receiver, + test_config.cache_size, + test_config.random_seed, + test_config.drop_probability, + test_config.invalid_probability, + ); + Ok(tokio::spawn(papyrus_consensus::run_consensus( context, - start_height, - validator_id, + config.start_height, + config.validator_id, config.consensus_delay, - consensus_channels.broadcasted_messages_receiver, - sync_rx, - ))), + network_receiver, + futures::stream::pending(), + ))) + } else { + Ok(tokio::spawn(papyrus_consensus::run_consensus( + context, + config.start_height, + config.validator_id, + config.consensus_delay, + network_channels.broadcasted_messages_receiver, + futures::stream::pending(), + ))) } } @@ -152,13 +151,22 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> { // P2P network. let ( - network_future, + mut maybe_network_manager, maybe_sync_client_channels, maybe_sync_server_channels, - maybe_consensus_channels, local_peer_id, - ) = run_network(config.network.clone(), config.consensus.clone())?; - let network_handle = tokio::spawn(network_future); + ) = register_to_network(config.network.clone())?; + let consensus_handle = run_consensus( + config.consensus.as_ref(), + storage_reader.clone(), + maybe_network_manager.as_mut(), + )?; + let network_handle = tokio::spawn(async move { + match maybe_network_manager { + Some(manager) => manager.run().boxed().await, + None => pending().boxed().await, + } + }); // Monitoring server. let monitoring_server = MonitoringServer::new( @@ -236,16 +244,6 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> { let sync_handle = tokio::spawn(sync_future); let p2p_sync_client_handle = tokio::spawn(p2p_sync_client_future); - let consensus_handle = if let Some(consensus_channels) = maybe_consensus_channels { - run_consensus( - config.consensus.expect("If consensus_channels is Some, consensus must be Some too."), - storage_reader.clone(), - consensus_channels, - )? - } else { - tokio::spawn(pending()) - }; - tokio::select! { res = storage_metrics_handle => { error!("collecting storage metrics stopped."); @@ -329,20 +327,12 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> { } } -type NetworkRunReturn = ( - BoxFuture<'static, Result<(), NetworkError>>, - Option, - Option, - Option>, - String, -); - -fn run_network( - network_config: Option, - consensus_config: Option, -) -> anyhow::Result { +type NetworkRunReturn = + (Option, Option, Option, String); + +fn register_to_network(network_config: Option) -> anyhow::Result { let Some(network_config) = network_config else { - return Ok((pending().boxed(), None, None, None, "".to_string())); + return Ok((None, None, None, "".to_string())); }; let mut network_manager = network_manager::NetworkManager::new(network_config.clone()); let local_peer_id = network_manager.get_local_peer_id(); @@ -364,14 +354,6 @@ fn run_network( let event_server_channel = network_manager.register_sqmr_protocol_server(Protocol::Event.into(), BUFFER_SIZE); - let consensus_channels = - match consensus_config { - Some(consensus_config) => Some(network_manager.register_broadcast_topic( - Topic::new(consensus_config.network_topic), - BUFFER_SIZE, - )?), - None => None, - }; let p2p_sync_client_channels = P2PSyncClientChannels::new( header_client_sender, state_diff_client_sender, @@ -386,10 +368,9 @@ fn run_network( ); Ok(( - network_manager.run().boxed(), + Some(network_manager), Some(p2p_sync_client_channels), Some(p2p_sync_server_channels), - consensus_channels, local_peer_id, )) } diff --git a/crates/sequencing/papyrus_consensus/src/manager.rs b/crates/sequencing/papyrus_consensus/src/manager.rs index f106155002..80df134a3c 100644 --- a/crates/sequencing/papyrus_consensus/src/manager.rs +++ b/crates/sequencing/papyrus_consensus/src/manager.rs @@ -46,6 +46,8 @@ where ProposalWrapper: Into<(ProposalInit, mpsc::Receiver, oneshot::Receiver)>, { + info!("Running consensus"); + // Add a short delay to allow peers to connect and avoid "InsufficientPeers" error tokio::time::sleep(consensus_delay).await; let mut current_height = start_height; diff --git a/crates/sequencing/papyrus_consensus/src/papyrus_consensus_context.rs b/crates/sequencing/papyrus_consensus/src/papyrus_consensus_context.rs index 642b63fbf7..9aec17055c 100644 --- a/crates/sequencing/papyrus_consensus/src/papyrus_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus/src/papyrus_consensus_context.rs @@ -17,7 +17,7 @@ use papyrus_storage::{StorageError, StorageReader}; use starknet_api::block::{BlockHash, BlockNumber}; use starknet_api::core::ContractAddress; use starknet_api::transaction::Transaction; -use tracing::{debug, debug_span, info, Instrument}; +use tracing::{debug, debug_span, info, warn, Instrument}; use crate::types::{ConsensusBlock, ConsensusContext, ConsensusError, ProposalInit, ValidatorId}; use crate::ProposalWrapper; @@ -199,8 +199,11 @@ impl ConsensusContext for PapyrusConsensusContext { transactions.push(tx); } - let block_hash = - fin_receiver.await.expect("Failed to get block hash from fin receiver"); + let Ok(block_hash) = fin_receiver.await else { + // This can occur due to sync interrupting a height. + warn!("Failed to get block hash from fin receiver. {init:?}"); + return; + }; let proposal = Proposal { height: init.height.0, round: init.round,