Skip to content

Commit

Permalink
DO NOT SUBMIT
Browse files Browse the repository at this point in the history
  • Loading branch information
matan-starkware committed Aug 12, 2024
1 parent 82930cd commit 689079f
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 97 deletions.
2 changes: 1 addition & 1 deletion crates/papyrus_common/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,5 @@ pub const PAPYRUS_NUM_ACTIVE_OUTBOUND_SESSIONS: &str = "papyrus_num_active_outbo
/// Global variable set by the main config to enable collecting profiling metrics.
pub static COLLECT_PROFILING_METRICS: OnceLock<bool> = OnceLock::new();

/// The height most recently decided by consensus.
/// The height consensus is currently working on.
pub const PAPYRUS_CONSENSUS_HEIGHT: &str = "papyrus_consensus_height";
150 changes: 72 additions & 78 deletions crates/papyrus_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ use std::process::exit;
use std::sync::Arc;
use std::time::Duration;

use futures::channel::mpsc;
use futures::future::BoxFuture;
use futures::FutureExt;
use futures::{FutureExt, StreamExt};
use papyrus_base_layer::ethereum_base_layer_contract::EthereumBaseLayerConfig;
use papyrus_common::metrics::COLLECT_PROFILING_METRICS;
use papyrus_common::pending_classes::PendingClasses;
Expand All @@ -23,7 +21,7 @@ use papyrus_consensus::simulation_network_receiver::NetworkReceiver;
use papyrus_consensus::types::ConsensusError;
use papyrus_monitoring_gateway::MonitoringServer;
use papyrus_network::gossipsub_impl::Topic;
use papyrus_network::network_manager::{BroadcastSubscriberChannels, NetworkError};
use papyrus_network::network_manager::NetworkManager;
use papyrus_network::{network_manager, NetworkConfig};
use papyrus_node::config::NodeConfig;
use papyrus_node::version::VERSION_FULL;
Expand All @@ -35,22 +33,21 @@ use papyrus_p2p_sync::client::{
};
use papyrus_p2p_sync::server::{P2PSyncServer, P2PSyncServerChannels};
use papyrus_p2p_sync::{Protocol, BUFFER_SIZE};
use papyrus_protobuf::consensus::ConsensusMessage;
#[cfg(feature = "rpc")]
use papyrus_rpc::run_server;
use papyrus_storage::{open_storage, update_storage_metrics, StorageReader, StorageWriter};
use papyrus_sync::sources::base_layer::{BaseLayerSourceError, EthereumBaseLayerSource};
use papyrus_sync::sources::central::{CentralError, CentralSource, CentralSourceConfig};
use papyrus_sync::sources::pending::PendingSource;
use papyrus_sync::{StateSync, StateSyncError, SyncConfig};
use starknet_api::block::BlockHash;
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_api::felt;
use starknet_client::reader::objects::pending_data::{PendingBlock, PendingBlockOrDeprecated};
use starknet_client::reader::PendingData;
use tokio::sync::RwLock;
use tokio::task::{JoinError, JoinHandle};
use tracing::metadata::LevelFilter;
use tracing::{debug_span, error, info, warn, Instrument};
use tracing::{debug, debug_span, error, info, warn, Instrument};
use tracing_subscriber::prelude::*;
use tracing_subscriber::{fmt, EnvFilter};

Expand Down Expand Up @@ -98,46 +95,62 @@ async fn create_rpc_server_future(
}

fn run_consensus(
config: ConsensusConfig,
config: Option<ConsensusConfig>,
storage_reader: StorageReader,
consensus_channels: BroadcastSubscriberChannels<ConsensusMessage>,
maybe_network_manager: Option<&mut NetworkManager>,
) -> anyhow::Result<JoinHandle<Result<(), ConsensusError>>> {
let validator_id = config.validator_id;
info!("Running consensus as validator {validator_id}");
let context = PapyrusConsensusContext::new(
storage_reader.clone(),
consensus_channels.messages_to_broadcast_sender,
config.num_validators,
);
let start_height = config.start_height;
// TODO(matan): connect this to an actual channel.
let (_, sync_rx) = mpsc::channel(1);
match config.test {
Some(test_config) => {
let network_receiver = NetworkReceiver::new(
consensus_channels.broadcasted_messages_receiver,
test_config.cache_size,
test_config.random_seed,
test_config.drop_probability,
test_config.invalid_probability,
);
Ok(tokio::spawn(papyrus_consensus::run_consensus(
context,
start_height,
validator_id,
config.consensus_delay,
network_receiver,
sync_rx,
)))
}
None => Ok(tokio::spawn(papyrus_consensus::run_consensus(
let (Some(config), Some(network_manager)) = (config, maybe_network_manager) else {
info!("Consensus is disabled.");
return Ok(tokio::spawn(pending()));
};
debug!("Consensus config {:?}", config);

let network_channels =
network_manager.register_broadcast_topic(Topic::new(config.topic), BUFFER_SIZE)?;
if let Some(test_config) = config.test {
// TODO(matan): Move topic to config.
let sync_channels = network_manager
.register_broadcast_topic(Topic::new("consensus_test_sync".to_string()), BUFFER_SIZE)?;
let context = PapyrusConsensusContext::new(
storage_reader.clone(),
network_channels.messages_to_broadcast_sender,
config.num_validators,
Some(sync_channels.messages_to_broadcast_sender),
);
let sync_receiver =
sync_channels.broadcasted_messages_receiver.map(|(vote, _report_sender)| {
BlockNumber(vote.expect("Sync channel should never have errors").height)
});
let network_receiver = NetworkReceiver::new(
network_channels.broadcasted_messages_receiver,
test_config.cache_size,
test_config.random_seed,
test_config.drop_probability,
test_config.invalid_probability,
);
Ok(tokio::spawn(papyrus_consensus::run_consensus(
context,
start_height,
validator_id,
config.start_height,
config.validator_id,
config.consensus_delay,
consensus_channels.broadcasted_messages_receiver,
sync_rx,
))),
network_receiver,
sync_receiver,
)))
} else {
let context = PapyrusConsensusContext::new(
storage_reader.clone(),
network_channels.messages_to_broadcast_sender,
config.num_validators,
None,
);
Ok(tokio::spawn(papyrus_consensus::run_consensus(
context,
config.start_height,
config.validator_id,
config.consensus_delay,
network_channels.broadcasted_messages_receiver,
futures::stream::pending(),
)))
}
}

Expand All @@ -152,13 +165,11 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> {

// P2P network.
let (
network_future,
mut maybe_network_manager,
maybe_sync_client_channels,
maybe_sync_server_channels,
maybe_consensus_channels,
local_peer_id,
) = run_network(config.network.clone(), config.consensus.clone())?;
let network_handle = tokio::spawn(network_future);
) = run_network(config.network.clone())?;

// Monitoring server.
let monitoring_server = MonitoringServer::new(
Expand Down Expand Up @@ -236,16 +247,15 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> {
let sync_handle = tokio::spawn(sync_future);
let p2p_sync_client_handle = tokio::spawn(p2p_sync_client_future);

let consensus_handle = if let Some(consensus_channels) = maybe_consensus_channels {
run_consensus(
config.consensus.expect("If consensus_channels is Some, consensus must be Some too."),
storage_reader.clone(),
consensus_channels,
)?
} else {
tokio::spawn(pending())
};
let consensus_handle =
run_consensus(config.consensus, storage_reader.clone(), maybe_network_manager.as_mut())?;

let network_handle = tokio::spawn(async move {
match maybe_network_manager {
Some(manager) => manager.run().boxed().await,
None => pending().boxed().await,
}
});
tokio::select! {
res = storage_metrics_handle => {
error!("collecting storage metrics stopped.");
Expand Down Expand Up @@ -329,20 +339,12 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> {
}
}

type NetworkRunReturn = (
BoxFuture<'static, Result<(), NetworkError>>,
Option<P2PSyncClientChannels>,
Option<P2PSyncServerChannels>,
Option<BroadcastSubscriberChannels<ConsensusMessage>>,
String,
);

fn run_network(
network_config: Option<NetworkConfig>,
consensus_config: Option<ConsensusConfig>,
) -> anyhow::Result<NetworkRunReturn> {
type NetworkRunReturn =
(Option<NetworkManager>, Option<P2PSyncClientChannels>, Option<P2PSyncServerChannels>, String);

fn run_network(network_config: Option<NetworkConfig>) -> anyhow::Result<NetworkRunReturn> {
let Some(network_config) = network_config else {
return Ok((pending().boxed(), None, None, None, "".to_string()));
return Ok((None, None, None, "".to_string()));
};
let mut network_manager = network_manager::NetworkManager::new(network_config.clone());
let local_peer_id = network_manager.get_local_peer_id();
Expand All @@ -364,13 +366,6 @@ fn run_network(
let event_server_channel =
network_manager.register_sqmr_protocol_server(Protocol::Event.into(), BUFFER_SIZE);

let consensus_channels = match consensus_config {
Some(consensus_config) => Some(
network_manager
.register_broadcast_topic(Topic::new(consensus_config.topic), BUFFER_SIZE)?,
),
None => None,
};
let p2p_sync_client_channels = P2PSyncClientChannels::new(
header_client_sender,
state_diff_client_sender,
Expand All @@ -385,10 +380,9 @@ fn run_network(
);

Ok((
network_manager.run().boxed(),
Some(network_manager),
Some(p2p_sync_client_channels),
Some(p2p_sync_server_channels),
consensus_channels,
local_peer_id,
))
}
Expand Down
2 changes: 2 additions & 0 deletions crates/papyrus_protobuf/src/converters/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ impl From<Vote> for protobuf::Vote {
}
}

auto_impl_into_and_try_from_vec_u8!(Vote, protobuf::Vote);

impl TryFrom<protobuf::ConsensusMessage> for ConsensusMessage {
type Error = ProtobufConversionError;

Expand Down
4 changes: 4 additions & 0 deletions crates/sequencing/papyrus_consensus/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::time::Duration;

use futures::channel::{mpsc, oneshot};
use futures::{Stream, StreamExt};
use papyrus_common::metrics::PAPYRUS_CONSENSUS_HEIGHT;
use papyrus_network::network_manager::ReportSender;
use papyrus_protobuf::consensus::{ConsensusMessage, Proposal};
use papyrus_protobuf::converters::ProtobufConversionError;
Expand Down Expand Up @@ -45,11 +46,14 @@ where
ProposalWrapper:
Into<(ProposalInit, mpsc::Receiver<BlockT::ProposalChunk>, oneshot::Receiver<BlockHash>)>,
{
info!("Running consensus");
// Add a short delay to allow peers to connect and avoid "InsufficientPeers" error
tokio::time::sleep(consensus_delay).await;
let mut current_height = start_height;
let mut manager = MultiHeightManager::new();
loop {
metrics::gauge!(PAPYRUS_CONSENSUS_HEIGHT, current_height.0 as f64);

let run_height =
manager.run_height(&mut context, current_height, validator_id, &mut network_receiver);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use async_trait::async_trait;
use futures::channel::{mpsc, oneshot};
use futures::sink::SinkExt;
use futures::StreamExt;
use papyrus_common::metrics::PAPYRUS_CONSENSUS_HEIGHT;
use papyrus_network::network_manager::BroadcastSubscriberSender;
use papyrus_protobuf::consensus::{ConsensusMessage, Proposal, Vote};
use papyrus_storage::body::BodyStorageReader;
Expand All @@ -18,7 +17,7 @@ use papyrus_storage::{StorageError, StorageReader};
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_api::core::ContractAddress;
use starknet_api::transaction::Transaction;
use tracing::{debug, debug_span, info, Instrument};
use tracing::{debug, debug_span, info, warn, Instrument};

use crate::types::{ConsensusBlock, ConsensusContext, ConsensusError, ProposalInit, ValidatorId};
use crate::ProposalWrapper;
Expand Down Expand Up @@ -46,22 +45,25 @@ impl ConsensusBlock for PapyrusConsensusBlock {

pub struct PapyrusConsensusContext {
storage_reader: StorageReader,
broadcast_sender: BroadcastSubscriberSender<ConsensusMessage>,
network_broadcast_sender: BroadcastSubscriberSender<ConsensusMessage>,
validators: Vec<ValidatorId>,
sync_broadcast_sender: Option<BroadcastSubscriberSender<Vote>>,
}

impl PapyrusConsensusContext {
// TODO(dvir): remove the dead code attribute after we will use this function.
#[allow(dead_code)]
pub fn new(
storage_reader: StorageReader,
broadcast_sender: BroadcastSubscriberSender<ConsensusMessage>,
network_broadcast_sender: BroadcastSubscriberSender<ConsensusMessage>,
num_validators: u64,
sync_broadcast_sender: Option<BroadcastSubscriberSender<Vote>>,
) -> Self {
Self {
storage_reader,
broadcast_sender,
network_broadcast_sender,
validators: (0..num_validators).map(ContractAddress::from).collect(),
sync_broadcast_sender,
}
}
}
Expand Down Expand Up @@ -161,9 +163,13 @@ impl ConsensusContext for PapyrusConsensusContext {
panic!("Block in {height} was not found in storage despite waiting for it")
})
.block_hash;

// This can happen as a result of sync interrupting `run_height`.
fin_sender
.send(PapyrusConsensusBlock { content: transactions, id: block_hash })
.expect("Send should succeed");
.unwrap_or_else(|_| {
warn!("Failed to send block to consensus. height={height}");
})
}
.instrument(debug_span!("consensus_validate_proposal")),
);
Expand All @@ -181,7 +187,7 @@ impl ConsensusContext for PapyrusConsensusContext {

async fn broadcast(&mut self, message: ConsensusMessage) -> Result<(), ConsensusError> {
debug!("Broadcasting message: {message:?}");
self.broadcast_sender.send(message).await?;
self.network_broadcast_sender.send(message).await?;
Ok(())
}

Expand All @@ -191,7 +197,7 @@ impl ConsensusContext for PapyrusConsensusContext {
mut content_receiver: mpsc::Receiver<Transaction>,
fin_receiver: oneshot::Receiver<BlockHash>,
) -> Result<(), ConsensusError> {
let mut broadcast_sender = self.broadcast_sender.clone();
let mut broadcast_sender = self.network_broadcast_sender.clone();

tokio::spawn(
async move {
Expand Down Expand Up @@ -237,7 +243,9 @@ impl ConsensusContext for PapyrusConsensusContext {
"Finished consensus for height: {height}. Agreed on block with id: {:x}",
block.id().0
);
metrics::gauge!(PAPYRUS_CONSENSUS_HEIGHT, height as f64);
if let Some(sender) = &self.sync_broadcast_sender {
sender.clone().send(precommits[0].clone()).await?;
}
Ok(())
}
}
Expand Down
Loading

0 comments on commit 689079f

Please sign in to comment.