diff --git a/Cargo.lock b/Cargo.lock index 5dec8d9a29f..415a224096f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6330,7 +6330,6 @@ dependencies = [ "papyrus_monitoring_gateway", "papyrus_network", "papyrus_p2p_sync", - "papyrus_protobuf", "papyrus_rpc", "papyrus_storage", "papyrus_sync", diff --git a/crates/papyrus_node/Cargo.toml b/crates/papyrus_node/Cargo.toml index e160c76d7eb..5b58eb9a995 100644 --- a/crates/papyrus_node/Cargo.toml +++ b/crates/papyrus_node/Cargo.toml @@ -32,7 +32,6 @@ papyrus_consensus.workspace = true papyrus_monitoring_gateway.workspace = true papyrus_network.workspace = true papyrus_p2p_sync.workspace = true -papyrus_protobuf.workspace = true papyrus_rpc = { workspace = true, optional = true } papyrus_storage.workspace = true papyrus_sync.workspace = true diff --git a/crates/papyrus_node/src/main.rs b/crates/papyrus_node/src/main.rs index d371bd90be8..eb78a3c613f 100644 --- a/crates/papyrus_node/src/main.rs +++ b/crates/papyrus_node/src/main.rs @@ -7,7 +7,6 @@ use std::process::exit; use std::sync::Arc; use std::time::Duration; -use futures::future::BoxFuture; use futures::FutureExt; use papyrus_base_layer::ethereum_base_layer_contract::EthereumBaseLayerConfig; use papyrus_common::metrics::COLLECT_PROFILING_METRICS; @@ -22,7 +21,7 @@ use papyrus_consensus::simulation_network_receiver::NetworkReceiver; use papyrus_consensus::types::ConsensusError; use papyrus_monitoring_gateway::MonitoringServer; use papyrus_network::gossipsub_impl::Topic; -use papyrus_network::network_manager::{BroadcastSubscriberChannels, NetworkError}; +use papyrus_network::network_manager::NetworkManager; use papyrus_network::{network_manager, NetworkConfig}; use papyrus_node::config::NodeConfig; use papyrus_node::version::VERSION_FULL; @@ -34,7 +33,6 @@ use papyrus_p2p_sync::client::{ }; use papyrus_p2p_sync::server::{P2PSyncServer, P2PSyncServerChannels}; use papyrus_p2p_sync::{Protocol, BUFFER_SIZE}; -use papyrus_protobuf::consensus::ConsensusMessage; #[cfg(feature = "rpc")] use papyrus_rpc::run_server; use papyrus_storage::{open_storage, update_storage_metrics, StorageReader, StorageWriter}; @@ -97,44 +95,48 @@ async fn create_rpc_server_future( } fn run_consensus( - config: ConsensusConfig, + config: Option<&ConsensusConfig>, storage_reader: StorageReader, - consensus_channels: BroadcastSubscriberChannels<ConsensusMessage>, + network_manager: Option<&mut NetworkManager>, ) -> anyhow::Result<JoinHandle<Result<(), ConsensusError>>> { - let validator_id = config.validator_id; - info!("Running consensus as validator {validator_id}"); + let (Some(config), Some(network_manager)) = (config, network_manager) else { + info!("Consensus is disabled."); + return Ok(tokio::spawn(pending())); + }; + + let network_channels = network_manager + .register_broadcast_topic(Topic::new(config.network_topic.clone()), BUFFER_SIZE)?; let context = PapyrusConsensusContext::new( storage_reader.clone(), - consensus_channels.messages_to_broadcast_sender, + network_channels.messages_to_broadcast_sender, config.num_validators, ); - let start_height = config.start_height; - match config.test { - Some(test_config) => { - let network_receiver = NetworkReceiver::new( - consensus_channels.broadcasted_messages_receiver, - test_config.cache_size, - test_config.random_seed, - test_config.drop_probability, - test_config.invalid_probability, - ); - Ok(tokio::spawn(papyrus_consensus::run_consensus( - context, - start_height, - validator_id, - config.consensus_delay, - network_receiver, - futures::stream::pending(), - ))) - } - None => Ok(tokio::spawn(papyrus_consensus::run_consensus( + // TODO(matan): connect this to an actual channel. + if let Some(test_config) = config.test.as_ref() { + let network_receiver = NetworkReceiver::new( + network_channels.broadcasted_messages_receiver, + test_config.cache_size, + test_config.random_seed, + test_config.drop_probability, + test_config.invalid_probability, + ); + Ok(tokio::spawn(papyrus_consensus::run_consensus( context, - start_height, - validator_id, + config.start_height, + config.validator_id, config.consensus_delay, - consensus_channels.broadcasted_messages_receiver, + network_receiver, futures::stream::pending(), - ))), + ))) + } else { + Ok(tokio::spawn(papyrus_consensus::run_consensus( + context, + config.start_height, + config.validator_id, + config.consensus_delay, + network_channels.broadcasted_messages_receiver, + futures::stream::pending(), + ))) } } @@ -149,13 +151,22 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> { // P2P network. let ( - network_future, + mut maybe_network_manager, maybe_sync_client_channels, maybe_sync_server_channels, - maybe_consensus_channels, local_peer_id, - ) = run_network(config.network.clone(), config.consensus.clone())?; - let network_handle = tokio::spawn(network_future); + ) = register_to_network(config.network.clone())?; + let consensus_handle = run_consensus( + config.consensus.as_ref(), + storage_reader.clone(), + maybe_network_manager.as_mut(), + )?; + let network_handle = tokio::spawn(async move { + match maybe_network_manager { + Some(manager) => manager.run().boxed().await, + None => pending().boxed().await, + } + }); // Monitoring server. let monitoring_server = MonitoringServer::new( @@ -233,16 +244,6 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> { let sync_handle = tokio::spawn(sync_future); let p2p_sync_client_handle = tokio::spawn(p2p_sync_client_future); - let consensus_handle = if let Some(consensus_channels) = maybe_consensus_channels { - run_consensus( - config.consensus.expect("If consensus_channels is Some, consensus must be Some too."), - storage_reader.clone(), - consensus_channels, - )? - } else { - tokio::spawn(pending()) - }; - tokio::select! { res = storage_metrics_handle => { error!("collecting storage metrics stopped."); @@ -326,20 +327,12 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> { } } -type NetworkRunReturn = ( - BoxFuture<'static, Result<(), NetworkError>>, - Option<P2PSyncClientChannels>, - Option<P2PSyncServerChannels>, - Option<BroadcastSubscriberChannels<ConsensusMessage>>, - String, -); - -fn run_network( - network_config: Option<NetworkConfig>, - consensus_config: Option<ConsensusConfig>, -) -> anyhow::Result<NetworkRunReturn> { +type NetworkRunReturn = + (Option<NetworkManager>, Option<P2PSyncClientChannels>, Option<P2PSyncServerChannels>, String); + +fn register_to_network(network_config: Option<NetworkConfig>) -> anyhow::Result<NetworkRunReturn> { let Some(network_config) = network_config else { - return Ok((pending().boxed(), None, None, None, "".to_string())); + return Ok((None, None, None, "".to_string())); }; let mut network_manager = network_manager::NetworkManager::new(network_config.clone()); let local_peer_id = network_manager.get_local_peer_id(); @@ -361,14 +354,6 @@ fn run_network( let event_server_channel = network_manager.register_sqmr_protocol_server(Protocol::Event.into(), BUFFER_SIZE); - let consensus_channels = - match consensus_config { - Some(consensus_config) => Some(network_manager.register_broadcast_topic( - Topic::new(consensus_config.network_topic), - BUFFER_SIZE, - )?), - None => None, - }; let p2p_sync_client_channels = P2PSyncClientChannels::new( header_client_sender, state_diff_client_sender, @@ -383,10 +368,9 @@ fn run_network( ); Ok(( - network_manager.run().boxed(), + Some(network_manager), Some(p2p_sync_client_channels), Some(p2p_sync_server_channels), - consensus_channels, local_peer_id, )) } diff --git a/crates/sequencing/papyrus_consensus/src/manager.rs b/crates/sequencing/papyrus_consensus/src/manager.rs index f1061550027..80df134a3c2 100644 --- a/crates/sequencing/papyrus_consensus/src/manager.rs +++ b/crates/sequencing/papyrus_consensus/src/manager.rs @@ -46,6 +46,8 @@ where ProposalWrapper: Into<(ProposalInit, mpsc::Receiver<BlockT::ProposalChunk>, oneshot::Receiver<BlockHash>)>, { + info!("Running consensus"); + // Add a short delay to allow peers to connect and avoid "InsufficientPeers" error tokio::time::sleep(consensus_delay).await; let mut current_height = start_height; diff --git a/crates/sequencing/papyrus_consensus/src/papyrus_consensus_context.rs b/crates/sequencing/papyrus_consensus/src/papyrus_consensus_context.rs index 642b63fbf75..9aec17055ce 100644 --- a/crates/sequencing/papyrus_consensus/src/papyrus_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus/src/papyrus_consensus_context.rs @@ -17,7 +17,7 @@ use papyrus_storage::{StorageError, StorageReader}; use starknet_api::block::{BlockHash, BlockNumber}; use starknet_api::core::ContractAddress; use starknet_api::transaction::Transaction; -use tracing::{debug, debug_span, info, Instrument}; +use tracing::{debug, debug_span, info, warn, Instrument}; use crate::types::{ConsensusBlock, ConsensusContext, ConsensusError, ProposalInit, ValidatorId}; use crate::ProposalWrapper; @@ -199,8 +199,11 @@ impl ConsensusContext for PapyrusConsensusContext { transactions.push(tx); } - let block_hash = - fin_receiver.await.expect("Failed to get block hash from fin receiver"); + let Ok(block_hash) = fin_receiver.await else { + // This can occur due to sync interrupting a height. + warn!("Failed to get block hash from fin receiver. {init:?}"); + return; + }; let proposal = Proposal { height: init.height.0, round: init.round,