Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(consensus): consensus handles its network registrations on its own #409

Merged
merged 1 commit into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/papyrus_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ papyrus_consensus.workspace = true
papyrus_monitoring_gateway.workspace = true
papyrus_network.workspace = true
papyrus_p2p_sync.workspace = true
papyrus_protobuf.workspace = true
papyrus_rpc = { workspace = true, optional = true }
papyrus_storage.workspace = true
papyrus_sync.workspace = true
Expand Down
122 changes: 53 additions & 69 deletions crates/papyrus_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::process::exit;
use std::sync::Arc;
use std::time::Duration;

use futures::future::BoxFuture;
use futures::FutureExt;
use papyrus_base_layer::ethereum_base_layer_contract::EthereumBaseLayerConfig;
use papyrus_common::metrics::COLLECT_PROFILING_METRICS;
Expand All @@ -22,7 +21,7 @@ use papyrus_consensus::simulation_network_receiver::NetworkReceiver;
use papyrus_consensus::types::ConsensusError;
use papyrus_monitoring_gateway::MonitoringServer;
use papyrus_network::gossipsub_impl::Topic;
use papyrus_network::network_manager::{BroadcastSubscriberChannels, NetworkError};
use papyrus_network::network_manager::NetworkManager;
use papyrus_network::{network_manager, NetworkConfig};
use papyrus_node::config::NodeConfig;
use papyrus_node::version::VERSION_FULL;
Expand All @@ -34,7 +33,6 @@ use papyrus_p2p_sync::client::{
};
use papyrus_p2p_sync::server::{P2PSyncServer, P2PSyncServerChannels};
use papyrus_p2p_sync::{Protocol, BUFFER_SIZE};
use papyrus_protobuf::consensus::ConsensusMessage;
#[cfg(feature = "rpc")]
use papyrus_rpc::run_server;
use papyrus_storage::{open_storage, update_storage_metrics, StorageReader, StorageWriter};
Expand Down Expand Up @@ -97,44 +95,48 @@ async fn create_rpc_server_future(
}

fn run_consensus(
config: ConsensusConfig,
config: Option<&ConsensusConfig>,
storage_reader: StorageReader,
consensus_channels: BroadcastSubscriberChannels<ConsensusMessage>,
network_manager: Option<&mut NetworkManager>,
) -> anyhow::Result<JoinHandle<Result<(), ConsensusError>>> {
let validator_id = config.validator_id;
info!("Running consensus as validator {validator_id}");
let (Some(config), Some(network_manager)) = (config, network_manager) else {
info!("Consensus is disabled.");
return Ok(tokio::spawn(pending()));
};

let network_channels = network_manager
.register_broadcast_topic(Topic::new(config.network_topic.clone()), BUFFER_SIZE)?;
let context = PapyrusConsensusContext::new(
storage_reader.clone(),
consensus_channels.messages_to_broadcast_sender,
network_channels.messages_to_broadcast_sender,
config.num_validators,
);
let start_height = config.start_height;
match config.test {
Some(test_config) => {
let network_receiver = NetworkReceiver::new(
consensus_channels.broadcasted_messages_receiver,
test_config.cache_size,
test_config.random_seed,
test_config.drop_probability,
test_config.invalid_probability,
);
Ok(tokio::spawn(papyrus_consensus::run_consensus(
context,
start_height,
validator_id,
config.consensus_delay,
network_receiver,
futures::stream::pending(),
)))
}
None => Ok(tokio::spawn(papyrus_consensus::run_consensus(
// TODO(matan): connect this to an actual channel.
if let Some(test_config) = config.test.as_ref() {
let network_receiver = NetworkReceiver::new(
network_channels.broadcasted_messages_receiver,
test_config.cache_size,
test_config.random_seed,
test_config.drop_probability,
test_config.invalid_probability,
);
Ok(tokio::spawn(papyrus_consensus::run_consensus(
context,
start_height,
validator_id,
config.start_height,
config.validator_id,
config.consensus_delay,
consensus_channels.broadcasted_messages_receiver,
network_receiver,
futures::stream::pending(),
))),
)))
} else {
Ok(tokio::spawn(papyrus_consensus::run_consensus(
context,
config.start_height,
config.validator_id,
config.consensus_delay,
network_channels.broadcasted_messages_receiver,
futures::stream::pending(),
)))
}
}

Expand All @@ -149,13 +151,22 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> {

// P2P network.
let (
network_future,
mut maybe_network_manager,
maybe_sync_client_channels,
maybe_sync_server_channels,
maybe_consensus_channels,
local_peer_id,
) = run_network(config.network.clone(), config.consensus.clone())?;
let network_handle = tokio::spawn(network_future);
) = register_to_network(config.network.clone())?;
let consensus_handle = run_consensus(
config.consensus.as_ref(),
storage_reader.clone(),
maybe_network_manager.as_mut(),
)?;
let network_handle = tokio::spawn(async move {
match maybe_network_manager {
Some(manager) => manager.run().boxed().await,
None => pending().boxed().await,
}
});

// Monitoring server.
let monitoring_server = MonitoringServer::new(
Expand Down Expand Up @@ -233,16 +244,6 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> {
let sync_handle = tokio::spawn(sync_future);
let p2p_sync_client_handle = tokio::spawn(p2p_sync_client_future);

let consensus_handle = if let Some(consensus_channels) = maybe_consensus_channels {
run_consensus(
config.consensus.expect("If consensus_channels is Some, consensus must be Some too."),
storage_reader.clone(),
consensus_channels,
)?
} else {
tokio::spawn(pending())
};

tokio::select! {
res = storage_metrics_handle => {
error!("collecting storage metrics stopped.");
Expand Down Expand Up @@ -326,20 +327,12 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> {
}
}

type NetworkRunReturn = (
BoxFuture<'static, Result<(), NetworkError>>,
Option<P2PSyncClientChannels>,
Option<P2PSyncServerChannels>,
Option<BroadcastSubscriberChannels<ConsensusMessage>>,
String,
);

fn run_network(
network_config: Option<NetworkConfig>,
consensus_config: Option<ConsensusConfig>,
) -> anyhow::Result<NetworkRunReturn> {
type NetworkRunReturn =
(Option<NetworkManager>, Option<P2PSyncClientChannels>, Option<P2PSyncServerChannels>, String);

fn register_to_network(network_config: Option<NetworkConfig>) -> anyhow::Result<NetworkRunReturn> {
let Some(network_config) = network_config else {
return Ok((pending().boxed(), None, None, None, "".to_string()));
return Ok((None, None, None, "".to_string()));
};
let mut network_manager = network_manager::NetworkManager::new(network_config.clone());
let local_peer_id = network_manager.get_local_peer_id();
Expand All @@ -361,14 +354,6 @@ fn run_network(
let event_server_channel =
network_manager.register_sqmr_protocol_server(Protocol::Event.into(), BUFFER_SIZE);

let consensus_channels =
match consensus_config {
Some(consensus_config) => Some(network_manager.register_broadcast_topic(
Topic::new(consensus_config.network_topic),
BUFFER_SIZE,
)?),
None => None,
};
let p2p_sync_client_channels = P2PSyncClientChannels::new(
header_client_sender,
state_diff_client_sender,
Expand All @@ -383,10 +368,9 @@ fn run_network(
);

Ok((
network_manager.run().boxed(),
Some(network_manager),
Some(p2p_sync_client_channels),
Some(p2p_sync_server_channels),
consensus_channels,
local_peer_id,
))
}
Expand Down
2 changes: 2 additions & 0 deletions crates/sequencing/papyrus_consensus/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ where
ProposalWrapper:
Into<(ProposalInit, mpsc::Receiver<BlockT::ProposalChunk>, oneshot::Receiver<BlockHash>)>,
{
info!("Running consensus");

// Add a short delay to allow peers to connect and avoid "InsufficientPeers" error
tokio::time::sleep(consensus_delay).await;
let mut current_height = start_height;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use papyrus_storage::{StorageError, StorageReader};
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_api::core::ContractAddress;
use starknet_api::transaction::Transaction;
use tracing::{debug, debug_span, info, Instrument};
use tracing::{debug, debug_span, info, warn, Instrument};

use crate::types::{ConsensusBlock, ConsensusContext, ConsensusError, ProposalInit, ValidatorId};
use crate::ProposalWrapper;
Expand Down Expand Up @@ -199,8 +199,11 @@ impl ConsensusContext for PapyrusConsensusContext {
transactions.push(tx);
}

let block_hash =
fin_receiver.await.expect("Failed to get block hash from fin receiver");
let Ok(block_hash) = fin_receiver.await else {
// This can occur due to sync interrupting a height.
warn!("Failed to get block hash from fin receiver. {init:?}");
return;
};
let proposal = Proposal {
height: init.height.0,
round: init.round,
Expand Down
Loading