From 62cf87346ce1644acd4cd68f53375b14c4f8c6f5 Mon Sep 17 00:00:00 2001 From: Matan Markind Date: Mon, 12 Aug 2024 13:25:26 +0300 Subject: [PATCH] refactor(consensus): consensus handles its network registrations on its own This is because the network registration logic will become more complex with test sync. In practice this means that after the initial network config we hold off on spawning the task until all tasks have been set up. --- Cargo.lock | 1 - crates/papyrus_node/Cargo.toml | 1 - crates/papyrus_node/src/main.rs | 122 ++++++++---------- .../papyrus_consensus/src/manager.rs | 2 + .../src/papyrus_consensus_context.rs | 9 +- 5 files changed, 61 insertions(+), 74 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5dec8d9a29f..415a224096f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6330,7 +6330,6 @@ dependencies = [ "papyrus_monitoring_gateway", "papyrus_network", "papyrus_p2p_sync", - "papyrus_protobuf", "papyrus_rpc", "papyrus_storage", "papyrus_sync", diff --git a/crates/papyrus_node/Cargo.toml b/crates/papyrus_node/Cargo.toml index e160c76d7eb..5b58eb9a995 100644 --- a/crates/papyrus_node/Cargo.toml +++ b/crates/papyrus_node/Cargo.toml @@ -32,7 +32,6 @@ papyrus_consensus.workspace = true papyrus_monitoring_gateway.workspace = true papyrus_network.workspace = true papyrus_p2p_sync.workspace = true -papyrus_protobuf.workspace = true papyrus_rpc = { workspace = true, optional = true } papyrus_storage.workspace = true papyrus_sync.workspace = true diff --git a/crates/papyrus_node/src/main.rs b/crates/papyrus_node/src/main.rs index d371bd90be8..eb78a3c613f 100644 --- a/crates/papyrus_node/src/main.rs +++ b/crates/papyrus_node/src/main.rs @@ -7,7 +7,6 @@ use std::process::exit; use std::sync::Arc; use std::time::Duration; -use futures::future::BoxFuture; use futures::FutureExt; use papyrus_base_layer::ethereum_base_layer_contract::EthereumBaseLayerConfig; use papyrus_common::metrics::COLLECT_PROFILING_METRICS; @@ -22,7 +21,7 @@ use papyrus_consensus::simulation_network_receiver::NetworkReceiver; use papyrus_consensus::types::ConsensusError; use papyrus_monitoring_gateway::MonitoringServer; use papyrus_network::gossipsub_impl::Topic; -use papyrus_network::network_manager::{BroadcastSubscriberChannels, NetworkError}; +use papyrus_network::network_manager::NetworkManager; use papyrus_network::{network_manager, NetworkConfig}; use papyrus_node::config::NodeConfig; use papyrus_node::version::VERSION_FULL; @@ -34,7 +33,6 @@ use papyrus_p2p_sync::client::{ }; use papyrus_p2p_sync::server::{P2PSyncServer, P2PSyncServerChannels}; use papyrus_p2p_sync::{Protocol, BUFFER_SIZE}; -use papyrus_protobuf::consensus::ConsensusMessage; #[cfg(feature = "rpc")] use papyrus_rpc::run_server; use papyrus_storage::{open_storage, update_storage_metrics, StorageReader, StorageWriter}; @@ -97,44 +95,48 @@ async fn create_rpc_server_future( } fn run_consensus( - config: ConsensusConfig, + config: Option<&ConsensusConfig>, storage_reader: StorageReader, - consensus_channels: BroadcastSubscriberChannels, + network_manager: Option<&mut NetworkManager>, ) -> anyhow::Result>> { - let validator_id = config.validator_id; - info!("Running consensus as validator {validator_id}"); + let (Some(config), Some(network_manager)) = (config, network_manager) else { + info!("Consensus is disabled."); + return Ok(tokio::spawn(pending())); + }; + + let network_channels = network_manager + .register_broadcast_topic(Topic::new(config.network_topic.clone()), BUFFER_SIZE)?; let context = PapyrusConsensusContext::new( storage_reader.clone(), - consensus_channels.messages_to_broadcast_sender, + network_channels.messages_to_broadcast_sender, config.num_validators, ); - let start_height = config.start_height; - match config.test { - Some(test_config) => { - let network_receiver = NetworkReceiver::new( - consensus_channels.broadcasted_messages_receiver, - test_config.cache_size, - test_config.random_seed, - test_config.drop_probability, - test_config.invalid_probability, - ); - Ok(tokio::spawn(papyrus_consensus::run_consensus( - context, - start_height, - validator_id, - config.consensus_delay, - network_receiver, - futures::stream::pending(), - ))) - } - None => Ok(tokio::spawn(papyrus_consensus::run_consensus( + // TODO(matan): connect this to an actual channel. + if let Some(test_config) = config.test.as_ref() { + let network_receiver = NetworkReceiver::new( + network_channels.broadcasted_messages_receiver, + test_config.cache_size, + test_config.random_seed, + test_config.drop_probability, + test_config.invalid_probability, + ); + Ok(tokio::spawn(papyrus_consensus::run_consensus( context, - start_height, - validator_id, + config.start_height, + config.validator_id, config.consensus_delay, - consensus_channels.broadcasted_messages_receiver, + network_receiver, futures::stream::pending(), - ))), + ))) + } else { + Ok(tokio::spawn(papyrus_consensus::run_consensus( + context, + config.start_height, + config.validator_id, + config.consensus_delay, + network_channels.broadcasted_messages_receiver, + futures::stream::pending(), + ))) } } @@ -149,13 +151,22 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> { // P2P network. let ( - network_future, + mut maybe_network_manager, maybe_sync_client_channels, maybe_sync_server_channels, - maybe_consensus_channels, local_peer_id, - ) = run_network(config.network.clone(), config.consensus.clone())?; - let network_handle = tokio::spawn(network_future); + ) = register_to_network(config.network.clone())?; + let consensus_handle = run_consensus( + config.consensus.as_ref(), + storage_reader.clone(), + maybe_network_manager.as_mut(), + )?; + let network_handle = tokio::spawn(async move { + match maybe_network_manager { + Some(manager) => manager.run().boxed().await, + None => pending().boxed().await, + } + }); // Monitoring server. let monitoring_server = MonitoringServer::new( @@ -233,16 +244,6 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> { let sync_handle = tokio::spawn(sync_future); let p2p_sync_client_handle = tokio::spawn(p2p_sync_client_future); - let consensus_handle = if let Some(consensus_channels) = maybe_consensus_channels { - run_consensus( - config.consensus.expect("If consensus_channels is Some, consensus must be Some too."), - storage_reader.clone(), - consensus_channels, - )? - } else { - tokio::spawn(pending()) - }; - tokio::select! { res = storage_metrics_handle => { error!("collecting storage metrics stopped."); @@ -326,20 +327,12 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> { } } -type NetworkRunReturn = ( - BoxFuture<'static, Result<(), NetworkError>>, - Option, - Option, - Option>, - String, -); - -fn run_network( - network_config: Option, - consensus_config: Option, -) -> anyhow::Result { +type NetworkRunReturn = + (Option, Option, Option, String); + +fn register_to_network(network_config: Option) -> anyhow::Result { let Some(network_config) = network_config else { - return Ok((pending().boxed(), None, None, None, "".to_string())); + return Ok((None, None, None, "".to_string())); }; let mut network_manager = network_manager::NetworkManager::new(network_config.clone()); let local_peer_id = network_manager.get_local_peer_id(); @@ -361,14 +354,6 @@ fn run_network( let event_server_channel = network_manager.register_sqmr_protocol_server(Protocol::Event.into(), BUFFER_SIZE); - let consensus_channels = - match consensus_config { - Some(consensus_config) => Some(network_manager.register_broadcast_topic( - Topic::new(consensus_config.network_topic), - BUFFER_SIZE, - )?), - None => None, - }; let p2p_sync_client_channels = P2PSyncClientChannels::new( header_client_sender, state_diff_client_sender, @@ -383,10 +368,9 @@ fn run_network( ); Ok(( - network_manager.run().boxed(), + Some(network_manager), Some(p2p_sync_client_channels), Some(p2p_sync_server_channels), - consensus_channels, local_peer_id, )) } diff --git a/crates/sequencing/papyrus_consensus/src/manager.rs b/crates/sequencing/papyrus_consensus/src/manager.rs index f1061550027..80df134a3c2 100644 --- a/crates/sequencing/papyrus_consensus/src/manager.rs +++ b/crates/sequencing/papyrus_consensus/src/manager.rs @@ -46,6 +46,8 @@ where ProposalWrapper: Into<(ProposalInit, mpsc::Receiver, oneshot::Receiver)>, { + info!("Running consensus"); + // Add a short delay to allow peers to connect and avoid "InsufficientPeers" error tokio::time::sleep(consensus_delay).await; let mut current_height = start_height; diff --git a/crates/sequencing/papyrus_consensus/src/papyrus_consensus_context.rs b/crates/sequencing/papyrus_consensus/src/papyrus_consensus_context.rs index 642b63fbf75..9aec17055ce 100644 --- a/crates/sequencing/papyrus_consensus/src/papyrus_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus/src/papyrus_consensus_context.rs @@ -17,7 +17,7 @@ use papyrus_storage::{StorageError, StorageReader}; use starknet_api::block::{BlockHash, BlockNumber}; use starknet_api::core::ContractAddress; use starknet_api::transaction::Transaction; -use tracing::{debug, debug_span, info, Instrument}; +use tracing::{debug, debug_span, info, warn, Instrument}; use crate::types::{ConsensusBlock, ConsensusContext, ConsensusError, ProposalInit, ValidatorId}; use crate::ProposalWrapper; @@ -199,8 +199,11 @@ impl ConsensusContext for PapyrusConsensusContext { transactions.push(tx); } - let block_hash = - fin_receiver.await.expect("Failed to get block hash from fin receiver"); + let Ok(block_hash) = fin_receiver.await else { + // This can occur due to sync interrupting a height. + warn!("Failed to get block hash from fin receiver. {init:?}"); + return; + }; let proposal = Proposal { height: init.height.0, round: init.round,