Skip to content

Commit

Permalink
feat(consensus): simulations with the test config will use the fake c…
Browse files Browse the repository at this point in the history
…onsensus sync
  • Loading branch information
matan-starkware committed Aug 12, 2024
1 parent 3271ddb commit aef6478
Showing 1 changed file with 23 additions and 9 deletions.
32 changes: 23 additions & 9 deletions crates/papyrus_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::process::exit;
use std::sync::Arc;
use std::time::Duration;

use futures::stream::StreamExt;
use futures::FutureExt;
use papyrus_base_layer::ethereum_base_layer_contract::EthereumBaseLayerConfig;
use papyrus_common::metrics::COLLECT_PROFILING_METRICS;
Expand Down Expand Up @@ -40,14 +41,14 @@ use papyrus_sync::sources::base_layer::{BaseLayerSourceError, EthereumBaseLayerS
use papyrus_sync::sources::central::{CentralError, CentralSource, CentralSourceConfig};
use papyrus_sync::sources::pending::PendingSource;
use papyrus_sync::{StateSync, StateSyncError, SyncConfig};
use starknet_api::block::BlockHash;
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_api::felt;
use starknet_client::reader::objects::pending_data::{PendingBlock, PendingBlockOrDeprecated};
use starknet_client::reader::PendingData;
use tokio::sync::RwLock;
use tokio::task::{JoinError, JoinHandle};
use tracing::metadata::LevelFilter;
use tracing::{debug_span, error, info, warn, Instrument};
use tracing::{debug, debug_span, error, info, warn, Instrument};
use tracing_subscriber::prelude::*;
use tracing_subscriber::{fmt, EnvFilter};

Expand Down Expand Up @@ -103,33 +104,46 @@ fn run_consensus(
info!("Consensus is disabled.");
return Ok(tokio::spawn(pending()));
};
debug!("Consensus configuration: {config:?}");

let network_channels = network_manager
.register_broadcast_topic(Topic::new(config.network_topic.clone()), BUFFER_SIZE)?;
let context = PapyrusConsensusContext::new(
storage_reader.clone(),
network_channels.messages_to_broadcast_sender,
config.num_validators,
None,
);
// TODO(matan): connect this to an actual channel.
if let Some(test_config) = config.test.as_ref() {
let sync_channels = network_manager
.register_broadcast_topic(Topic::new(test_config.sync_topic.clone()), BUFFER_SIZE)?;
let context = PapyrusConsensusContext::new(
storage_reader.clone(),
network_channels.messages_to_broadcast_sender,
config.num_validators,
Some(sync_channels.messages_to_broadcast_sender),
);
let network_receiver = NetworkReceiver::new(
network_channels.broadcasted_messages_receiver,
test_config.cache_size,
test_config.random_seed,
test_config.drop_probability,
test_config.invalid_probability,
);
let sync_receiver =
sync_channels.broadcasted_messages_receiver.map(|(vote, _report_sender)| {
BlockNumber(vote.expect("Sync channel should never have errors").height)
});
Ok(tokio::spawn(papyrus_consensus::run_consensus(
context,
config.start_height,
config.validator_id,
config.consensus_delay,
network_receiver,
futures::stream::pending(),
sync_receiver,
)))
} else {
let context = PapyrusConsensusContext::new(
storage_reader.clone(),
network_channels.messages_to_broadcast_sender,
config.num_validators,
None,
);
Ok(tokio::spawn(papyrus_consensus::run_consensus(
context,
config.start_height,
Expand Down

0 comments on commit aef6478

Please sign in to comment.