diff --git a/crates/papyrus_node/src/main.rs b/crates/papyrus_node/src/main.rs index b40384a3f9f..c578bf58fe0 100644 --- a/crates/papyrus_node/src/main.rs +++ b/crates/papyrus_node/src/main.rs @@ -7,6 +7,7 @@ use std::process::exit; use std::sync::Arc; use std::time::Duration; +use futures::stream::StreamExt; use futures::FutureExt; use papyrus_base_layer::ethereum_base_layer_contract::EthereumBaseLayerConfig; use papyrus_common::metrics::COLLECT_PROFILING_METRICS; @@ -40,14 +41,14 @@ use papyrus_sync::sources::base_layer::{BaseLayerSourceError, EthereumBaseLayerS use papyrus_sync::sources::central::{CentralError, CentralSource, CentralSourceConfig}; use papyrus_sync::sources::pending::PendingSource; use papyrus_sync::{StateSync, StateSyncError, SyncConfig}; -use starknet_api::block::BlockHash; +use starknet_api::block::{BlockHash, BlockNumber}; use starknet_api::felt; use starknet_client::reader::objects::pending_data::{PendingBlock, PendingBlockOrDeprecated}; use starknet_client::reader::PendingData; use tokio::sync::RwLock; use tokio::task::{JoinError, JoinHandle}; use tracing::metadata::LevelFilter; -use tracing::{debug_span, error, info, warn, Instrument}; +use tracing::{debug, debug_span, error, info, warn, Instrument}; use tracing_subscriber::prelude::*; use tracing_subscriber::{fmt, EnvFilter}; @@ -103,17 +104,20 @@ fn run_consensus( info!("Consensus is disabled."); return Ok(tokio::spawn(pending())); }; + debug!("Consensus configuration: {config:?}"); let network_channels = network_manager .register_broadcast_topic(Topic::new(config.network_topic.clone()), BUFFER_SIZE)?; - let context = PapyrusConsensusContext::new( - storage_reader.clone(), - network_channels.messages_to_broadcast_sender, - config.num_validators, - None, - ); // TODO(matan): connect this to an actual channel. if let Some(test_config) = config.test.as_ref() { + let sync_channels = network_manager + .register_broadcast_topic(Topic::new(test_config.sync_topic.clone()), BUFFER_SIZE)?; + let context = PapyrusConsensusContext::new( + storage_reader.clone(), + network_channels.messages_to_broadcast_sender, + config.num_validators, + Some(sync_channels.messages_to_broadcast_sender), + ); let network_receiver = NetworkReceiver::new( network_channels.broadcasted_messages_receiver, test_config.cache_size, @@ -121,15 +125,25 @@ fn run_consensus( test_config.drop_probability, test_config.invalid_probability, ); + let sync_receiver = + sync_channels.broadcasted_messages_receiver.map(|(vote, _report_sender)| { + BlockNumber(vote.expect("Sync channel should never have errors").height) + }); Ok(tokio::spawn(papyrus_consensus::run_consensus( context, config.start_height, config.validator_id, config.consensus_delay, network_receiver, - futures::stream::pending(), + sync_receiver, ))) } else { + let context = PapyrusConsensusContext::new( + storage_reader.clone(), + network_channels.messages_to_broadcast_sender, + config.num_validators, + None, + ); Ok(tokio::spawn(papyrus_consensus::run_consensus( context, config.start_height,