From 507ff6eb7dbe25485a5d9a742456c30408a93461 Mon Sep 17 00:00:00 2001 From: Matan Markind Date: Mon, 12 Aug 2024 13:25:26 +0300 Subject: [PATCH] refactor(consensus): consensus handles its network registrations on its own This is because the network registration logic will become more complex with test sync. In practice this means that after the initial network config we hold off on spawning the task until all tasks have been set up. --- Cargo.lock | 1 - crates/papyrus_node/Cargo.toml | 1 - crates/papyrus_node/src/main.rs | 125 ++++++++---------- .../papyrus_consensus/src/manager.rs | 2 + .../src/papyrus_consensus_context.rs | 9 +- 5 files changed, 61 insertions(+), 77 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ceb29be59d5..9e1ca03e740 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6330,7 +6330,6 @@ dependencies = [ "papyrus_monitoring_gateway", "papyrus_network", "papyrus_p2p_sync", - "papyrus_protobuf", "papyrus_rpc", "papyrus_storage", "papyrus_sync", diff --git a/crates/papyrus_node/Cargo.toml b/crates/papyrus_node/Cargo.toml index e160c76d7eb..5b58eb9a995 100644 --- a/crates/papyrus_node/Cargo.toml +++ b/crates/papyrus_node/Cargo.toml @@ -32,7 +32,6 @@ papyrus_consensus.workspace = true papyrus_monitoring_gateway.workspace = true papyrus_network.workspace = true papyrus_p2p_sync.workspace = true -papyrus_protobuf.workspace = true papyrus_rpc = { workspace = true, optional = true } papyrus_storage.workspace = true papyrus_sync.workspace = true diff --git a/crates/papyrus_node/src/main.rs b/crates/papyrus_node/src/main.rs index 3817b2b9971..eb78a3c613f 100644 --- a/crates/papyrus_node/src/main.rs +++ b/crates/papyrus_node/src/main.rs @@ -7,8 +7,6 @@ use std::process::exit; use std::sync::Arc; use std::time::Duration; -use futures::channel::mpsc; -use futures::future::BoxFuture; use futures::FutureExt; use papyrus_base_layer::ethereum_base_layer_contract::EthereumBaseLayerConfig; use papyrus_common::metrics::COLLECT_PROFILING_METRICS; @@ -23,7 +21,7 @@ use papyrus_consensus::simulation_network_receiver::NetworkReceiver; use papyrus_consensus::types::ConsensusError; use papyrus_monitoring_gateway::MonitoringServer; use papyrus_network::gossipsub_impl::Topic; -use papyrus_network::network_manager::{BroadcastSubscriberChannels, NetworkError}; +use papyrus_network::network_manager::NetworkManager; use papyrus_network::{network_manager, NetworkConfig}; use papyrus_node::config::NodeConfig; use papyrus_node::version::VERSION_FULL; @@ -35,7 +33,6 @@ use papyrus_p2p_sync::client::{ }; use papyrus_p2p_sync::server::{P2PSyncServer, P2PSyncServerChannels}; use papyrus_p2p_sync::{Protocol, BUFFER_SIZE}; -use papyrus_protobuf::consensus::ConsensusMessage; #[cfg(feature = "rpc")] use papyrus_rpc::run_server; use papyrus_storage::{open_storage, update_storage_metrics, StorageReader, StorageWriter}; @@ -98,46 +95,48 @@ async fn create_rpc_server_future( } fn run_consensus( - config: ConsensusConfig, + config: Option<&ConsensusConfig>, storage_reader: StorageReader, - consensus_channels: BroadcastSubscriberChannels, + network_manager: Option<&mut NetworkManager>, ) -> anyhow::Result>> { - let validator_id = config.validator_id; - info!("Running consensus as validator {validator_id}"); + let (Some(config), Some(network_manager)) = (config, network_manager) else { + info!("Consensus is disabled."); + return Ok(tokio::spawn(pending())); + }; + + let network_channels = network_manager + .register_broadcast_topic(Topic::new(config.network_topic.clone()), BUFFER_SIZE)?; let context = PapyrusConsensusContext::new( storage_reader.clone(), - consensus_channels.messages_to_broadcast_sender, + network_channels.messages_to_broadcast_sender, config.num_validators, ); - let start_height = config.start_height; // TODO(matan): connect this to an actual channel. - let (_, sync_rx) = mpsc::channel(1); - match config.test { - Some(test_config) => { - let network_receiver = NetworkReceiver::new( - consensus_channels.broadcasted_messages_receiver, - test_config.cache_size, - test_config.random_seed, - test_config.drop_probability, - test_config.invalid_probability, - ); - Ok(tokio::spawn(papyrus_consensus::run_consensus( - context, - start_height, - validator_id, - config.consensus_delay, - network_receiver, - sync_rx, - ))) - } - None => Ok(tokio::spawn(papyrus_consensus::run_consensus( + if let Some(test_config) = config.test.as_ref() { + let network_receiver = NetworkReceiver::new( + network_channels.broadcasted_messages_receiver, + test_config.cache_size, + test_config.random_seed, + test_config.drop_probability, + test_config.invalid_probability, + ); + Ok(tokio::spawn(papyrus_consensus::run_consensus( context, - start_height, - validator_id, + config.start_height, + config.validator_id, config.consensus_delay, - consensus_channels.broadcasted_messages_receiver, - sync_rx, - ))), + network_receiver, + futures::stream::pending(), + ))) + } else { + Ok(tokio::spawn(papyrus_consensus::run_consensus( + context, + config.start_height, + config.validator_id, + config.consensus_delay, + network_channels.broadcasted_messages_receiver, + futures::stream::pending(), + ))) } } @@ -152,13 +151,22 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> { // P2P network. let ( - network_future, + mut maybe_network_manager, maybe_sync_client_channels, maybe_sync_server_channels, - maybe_consensus_channels, local_peer_id, - ) = run_network(config.network.clone(), config.consensus.clone())?; - let network_handle = tokio::spawn(network_future); + ) = register_to_network(config.network.clone())?; + let consensus_handle = run_consensus( + config.consensus.as_ref(), + storage_reader.clone(), + maybe_network_manager.as_mut(), + )?; + let network_handle = tokio::spawn(async move { + match maybe_network_manager { + Some(manager) => manager.run().boxed().await, + None => pending().boxed().await, + } + }); // Monitoring server. let monitoring_server = MonitoringServer::new( @@ -236,16 +244,6 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> { let sync_handle = tokio::spawn(sync_future); let p2p_sync_client_handle = tokio::spawn(p2p_sync_client_future); - let consensus_handle = if let Some(consensus_channels) = maybe_consensus_channels { - run_consensus( - config.consensus.expect("If consensus_channels is Some, consensus must be Some too."), - storage_reader.clone(), - consensus_channels, - )? - } else { - tokio::spawn(pending()) - }; - tokio::select! { res = storage_metrics_handle => { error!("collecting storage metrics stopped."); @@ -329,20 +327,12 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> { } } -type NetworkRunReturn = ( - BoxFuture<'static, Result<(), NetworkError>>, - Option, - Option, - Option>, - String, -); - -fn run_network( - network_config: Option, - consensus_config: Option, -) -> anyhow::Result { +type NetworkRunReturn = + (Option, Option, Option, String); + +fn register_to_network(network_config: Option) -> anyhow::Result { let Some(network_config) = network_config else { - return Ok((pending().boxed(), None, None, None, "".to_string())); + return Ok((None, None, None, "".to_string())); }; let mut network_manager = network_manager::NetworkManager::new(network_config.clone()); let local_peer_id = network_manager.get_local_peer_id(); @@ -364,14 +354,6 @@ fn run_network( let event_server_channel = network_manager.register_sqmr_protocol_server(Protocol::Event.into(), BUFFER_SIZE); - let consensus_channels = - match consensus_config { - Some(consensus_config) => Some(network_manager.register_broadcast_topic( - Topic::new(consensus_config.network_topic), - BUFFER_SIZE, - )?), - None => None, - }; let p2p_sync_client_channels = P2PSyncClientChannels::new( header_client_sender, state_diff_client_sender, @@ -386,10 +368,9 @@ fn run_network( ); Ok(( - network_manager.run().boxed(), + Some(network_manager), Some(p2p_sync_client_channels), Some(p2p_sync_server_channels), - consensus_channels, local_peer_id, )) } diff --git a/crates/sequencing/papyrus_consensus/src/manager.rs b/crates/sequencing/papyrus_consensus/src/manager.rs index 9defc700228..b87c25059b4 100644 --- a/crates/sequencing/papyrus_consensus/src/manager.rs +++ b/crates/sequencing/papyrus_consensus/src/manager.rs @@ -46,6 +46,8 @@ where ProposalWrapper: Into<(ProposalInit, mpsc::Receiver, oneshot::Receiver)>, { + info!("Running consensus"); + // Add a short delay to allow peers to connect and avoid "InsufficientPeers" error tokio::time::sleep(consensus_delay).await; let mut current_height = start_height; diff --git a/crates/sequencing/papyrus_consensus/src/papyrus_consensus_context.rs b/crates/sequencing/papyrus_consensus/src/papyrus_consensus_context.rs index 3af428ce821..67e81ef0c17 100644 --- a/crates/sequencing/papyrus_consensus/src/papyrus_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus/src/papyrus_consensus_context.rs @@ -17,7 +17,7 @@ use papyrus_storage::{StorageError, StorageReader}; use starknet_api::block::{BlockHash, BlockNumber}; use starknet_api::core::ContractAddress; use starknet_api::transaction::Transaction; -use tracing::{debug, debug_span, info, Instrument}; +use tracing::{debug, debug_span, info, warn, Instrument}; use crate::types::{ConsensusBlock, ConsensusContext, ConsensusError, ProposalInit, ValidatorId}; use crate::ProposalWrapper; @@ -199,8 +199,11 @@ impl ConsensusContext for PapyrusConsensusContext { transactions.push(tx); } - let block_hash = - fin_receiver.await.expect("Failed to get block hash from fin receiver"); + let Ok(block_hash) = fin_receiver.await else { + // This can occur due to sync interrupting a height. + warn!("Failed to get block hash from fin receiver. {init:?}"); + return; + }; let proposal = Proposal { height: init.height.0, round: init.round,