Skip to content

Commit

Permalink
refactor(consensus): consensus handles its network registrations on i…
Browse files Browse the repository at this point in the history
…ts own

This is because the network registration logic will become more complex with test sync.
In practice this means that after the initial network config we hold off on spawning the task
until all tasks have been set up.
  • Loading branch information
matan-starkware committed Aug 15, 2024
1 parent 39ebf3c commit 0fc07e5
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 74 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/papyrus_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ papyrus_consensus.workspace = true
papyrus_monitoring_gateway.workspace = true
papyrus_network.workspace = true
papyrus_p2p_sync.workspace = true
papyrus_protobuf.workspace = true
papyrus_rpc = { workspace = true, optional = true }
papyrus_storage.workspace = true
papyrus_sync.workspace = true
Expand Down
122 changes: 53 additions & 69 deletions crates/papyrus_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::process::exit;
use std::sync::Arc;
use std::time::Duration;

use futures::future::BoxFuture;
use futures::FutureExt;
use papyrus_base_layer::ethereum_base_layer_contract::EthereumBaseLayerConfig;
use papyrus_common::metrics::COLLECT_PROFILING_METRICS;
Expand All @@ -22,7 +21,7 @@ use papyrus_consensus::simulation_network_receiver::NetworkReceiver;
use papyrus_consensus::types::ConsensusError;
use papyrus_monitoring_gateway::MonitoringServer;
use papyrus_network::gossipsub_impl::Topic;
use papyrus_network::network_manager::{BroadcastSubscriberChannels, NetworkError};
use papyrus_network::network_manager::NetworkManager;
use papyrus_network::{network_manager, NetworkConfig};
use papyrus_node::config::NodeConfig;
use papyrus_node::version::VERSION_FULL;
Expand All @@ -34,7 +33,6 @@ use papyrus_p2p_sync::client::{
};
use papyrus_p2p_sync::server::{P2PSyncServer, P2PSyncServerChannels};
use papyrus_p2p_sync::{Protocol, BUFFER_SIZE};
use papyrus_protobuf::consensus::ConsensusMessage;
#[cfg(feature = "rpc")]
use papyrus_rpc::run_server;
use papyrus_storage::{open_storage, update_storage_metrics, StorageReader, StorageWriter};
Expand Down Expand Up @@ -97,44 +95,48 @@ async fn create_rpc_server_future(
}

fn run_consensus(
config: ConsensusConfig,
config: Option<&ConsensusConfig>,
storage_reader: StorageReader,
consensus_channels: BroadcastSubscriberChannels<ConsensusMessage>,
network_manager: Option<&mut NetworkManager>,
) -> anyhow::Result<JoinHandle<Result<(), ConsensusError>>> {
let validator_id = config.validator_id;
info!("Running consensus as validator {validator_id}");
let (Some(config), Some(network_manager)) = (config, network_manager) else {
info!("Consensus is disabled.");
return Ok(tokio::spawn(pending()));
};

let network_channels = network_manager
.register_broadcast_topic(Topic::new(config.network_topic.clone()), BUFFER_SIZE)?;
let context = PapyrusConsensusContext::new(
storage_reader.clone(),
consensus_channels.messages_to_broadcast_sender,
network_channels.messages_to_broadcast_sender,
config.num_validators,
);
let start_height = config.start_height;
match config.test {
Some(test_config) => {
let network_receiver = NetworkReceiver::new(
consensus_channels.broadcasted_messages_receiver,
test_config.cache_size,
test_config.random_seed,
test_config.drop_probability,
test_config.invalid_probability,
);
Ok(tokio::spawn(papyrus_consensus::run_consensus(
context,
start_height,
validator_id,
config.consensus_delay,
network_receiver,
futures::stream::pending(),
)))
}
None => Ok(tokio::spawn(papyrus_consensus::run_consensus(
// TODO(matan): connect this to an actual channel.
if let Some(test_config) = config.test.as_ref() {
let network_receiver = NetworkReceiver::new(
network_channels.broadcasted_messages_receiver,
test_config.cache_size,
test_config.random_seed,
test_config.drop_probability,
test_config.invalid_probability,
);
Ok(tokio::spawn(papyrus_consensus::run_consensus(
context,
start_height,
validator_id,
config.start_height,
config.validator_id,
config.consensus_delay,
consensus_channels.broadcasted_messages_receiver,
network_receiver,
futures::stream::pending(),
))),
)))
} else {
Ok(tokio::spawn(papyrus_consensus::run_consensus(
context,
config.start_height,
config.validator_id,
config.consensus_delay,
network_channels.broadcasted_messages_receiver,
futures::stream::pending(),
)))
}
}

Expand All @@ -149,13 +151,22 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> {

// P2P network.
let (
network_future,
mut maybe_network_manager,
maybe_sync_client_channels,
maybe_sync_server_channels,
maybe_consensus_channels,
local_peer_id,
) = run_network(config.network.clone(), config.consensus.clone())?;
let network_handle = tokio::spawn(network_future);
) = register_to_network(config.network.clone())?;
let consensus_handle = run_consensus(
config.consensus.as_ref(),
storage_reader.clone(),
maybe_network_manager.as_mut(),
)?;
let network_handle = tokio::spawn(async move {
match maybe_network_manager {
Some(manager) => manager.run().boxed().await,
None => pending().boxed().await,
}
});

// Monitoring server.
let monitoring_server = MonitoringServer::new(
Expand Down Expand Up @@ -233,16 +244,6 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> {
let sync_handle = tokio::spawn(sync_future);
let p2p_sync_client_handle = tokio::spawn(p2p_sync_client_future);

let consensus_handle = if let Some(consensus_channels) = maybe_consensus_channels {
run_consensus(
config.consensus.expect("If consensus_channels is Some, consensus must be Some too."),
storage_reader.clone(),
consensus_channels,
)?
} else {
tokio::spawn(pending())
};

tokio::select! {
res = storage_metrics_handle => {
error!("collecting storage metrics stopped.");
Expand Down Expand Up @@ -326,20 +327,12 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> {
}
}

type NetworkRunReturn = (
BoxFuture<'static, Result<(), NetworkError>>,
Option<P2PSyncClientChannels>,
Option<P2PSyncServerChannels>,
Option<BroadcastSubscriberChannels<ConsensusMessage>>,
String,
);

fn run_network(
network_config: Option<NetworkConfig>,
consensus_config: Option<ConsensusConfig>,
) -> anyhow::Result<NetworkRunReturn> {
type NetworkRunReturn =
(Option<NetworkManager>, Option<P2PSyncClientChannels>, Option<P2PSyncServerChannels>, String);

fn register_to_network(network_config: Option<NetworkConfig>) -> anyhow::Result<NetworkRunReturn> {
let Some(network_config) = network_config else {
return Ok((pending().boxed(), None, None, None, "".to_string()));
return Ok((None, None, None, "".to_string()));
};
let mut network_manager = network_manager::NetworkManager::new(network_config.clone());
let local_peer_id = network_manager.get_local_peer_id();
Expand All @@ -361,14 +354,6 @@ fn run_network(
let event_server_channel =
network_manager.register_sqmr_protocol_server(Protocol::Event.into(), BUFFER_SIZE);

let consensus_channels =
match consensus_config {
Some(consensus_config) => Some(network_manager.register_broadcast_topic(
Topic::new(consensus_config.network_topic),
BUFFER_SIZE,
)?),
None => None,
};
let p2p_sync_client_channels = P2PSyncClientChannels::new(
header_client_sender,
state_diff_client_sender,
Expand All @@ -383,10 +368,9 @@ fn run_network(
);

Ok((
network_manager.run().boxed(),
Some(network_manager),
Some(p2p_sync_client_channels),
Some(p2p_sync_server_channels),
consensus_channels,
local_peer_id,
))
}
Expand Down
2 changes: 2 additions & 0 deletions crates/sequencing/papyrus_consensus/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ where
ProposalWrapper:
Into<(ProposalInit, mpsc::Receiver<BlockT::ProposalChunk>, oneshot::Receiver<BlockHash>)>,
{
info!("Running consensus");

// Add a short delay to allow peers to connect and avoid "InsufficientPeers" error
tokio::time::sleep(consensus_delay).await;
let mut current_height = start_height;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use papyrus_storage::{StorageError, StorageReader};
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_api::core::ContractAddress;
use starknet_api::transaction::Transaction;
use tracing::{debug, debug_span, info, Instrument};
use tracing::{debug, debug_span, info, warn, Instrument};

use crate::types::{ConsensusBlock, ConsensusContext, ConsensusError, ProposalInit, ValidatorId};
use crate::ProposalWrapper;
Expand Down Expand Up @@ -199,8 +199,11 @@ impl ConsensusContext for PapyrusConsensusContext {
transactions.push(tx);
}

let block_hash =
fin_receiver.await.expect("Failed to get block hash from fin receiver");
let Ok(block_hash) = fin_receiver.await else {
// This can occur due to sync interrupting a height.
warn!("Failed to get block hash from fin receiver. {init:?}");
return;
};
let proposal = Proposal {
height: init.height.0,
round: init.round,
Expand Down

0 comments on commit 0fc07e5

Please sign in to comment.