Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DO NOT SUBMIT #403

Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
DO NOT SUBMIT
  • Loading branch information
matan-starkware committed Aug 12, 2024
commit 689079f486b0f17e3a6022afbe08b446ffe2ea19
2 changes: 1 addition & 1 deletion crates/papyrus_common/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,5 @@ pub const PAPYRUS_NUM_ACTIVE_OUTBOUND_SESSIONS: &str = "papyrus_num_active_outbo
/// Global variable set by the main config to enable collecting profiling metrics.
pub static COLLECT_PROFILING_METRICS: OnceLock<bool> = OnceLock::new();

/// The height most recently decided by consensus.
/// The height consensus is currently working on.
pub const PAPYRUS_CONSENSUS_HEIGHT: &str = "papyrus_consensus_height";
150 changes: 72 additions & 78 deletions crates/papyrus_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ use std::process::exit;
use std::sync::Arc;
use std::time::Duration;

use futures::channel::mpsc;
use futures::future::BoxFuture;
use futures::FutureExt;
use futures::{FutureExt, StreamExt};
use papyrus_base_layer::ethereum_base_layer_contract::EthereumBaseLayerConfig;
use papyrus_common::metrics::COLLECT_PROFILING_METRICS;
use papyrus_common::pending_classes::PendingClasses;
Expand All @@ -23,7 +21,7 @@ use papyrus_consensus::simulation_network_receiver::NetworkReceiver;
use papyrus_consensus::types::ConsensusError;
use papyrus_monitoring_gateway::MonitoringServer;
use papyrus_network::gossipsub_impl::Topic;
use papyrus_network::network_manager::{BroadcastSubscriberChannels, NetworkError};
use papyrus_network::network_manager::NetworkManager;
use papyrus_network::{network_manager, NetworkConfig};
use papyrus_node::config::NodeConfig;
use papyrus_node::version::VERSION_FULL;
Expand All @@ -35,22 +33,21 @@ use papyrus_p2p_sync::client::{
};
use papyrus_p2p_sync::server::{P2PSyncServer, P2PSyncServerChannels};
use papyrus_p2p_sync::{Protocol, BUFFER_SIZE};
use papyrus_protobuf::consensus::ConsensusMessage;
#[cfg(feature = "rpc")]
use papyrus_rpc::run_server;
use papyrus_storage::{open_storage, update_storage_metrics, StorageReader, StorageWriter};
use papyrus_sync::sources::base_layer::{BaseLayerSourceError, EthereumBaseLayerSource};
use papyrus_sync::sources::central::{CentralError, CentralSource, CentralSourceConfig};
use papyrus_sync::sources::pending::PendingSource;
use papyrus_sync::{StateSync, StateSyncError, SyncConfig};
use starknet_api::block::BlockHash;
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_api::felt;
use starknet_client::reader::objects::pending_data::{PendingBlock, PendingBlockOrDeprecated};
use starknet_client::reader::PendingData;
use tokio::sync::RwLock;
use tokio::task::{JoinError, JoinHandle};
use tracing::metadata::LevelFilter;
use tracing::{debug_span, error, info, warn, Instrument};
use tracing::{debug, debug_span, error, info, warn, Instrument};
use tracing_subscriber::prelude::*;
use tracing_subscriber::{fmt, EnvFilter};

Expand Down Expand Up @@ -98,46 +95,62 @@ async fn create_rpc_server_future(
}

fn run_consensus(
config: ConsensusConfig,
config: Option<ConsensusConfig>,
storage_reader: StorageReader,
consensus_channels: BroadcastSubscriberChannels<ConsensusMessage>,
maybe_network_manager: Option<&mut NetworkManager>,
) -> anyhow::Result<JoinHandle<Result<(), ConsensusError>>> {
let validator_id = config.validator_id;
info!("Running consensus as validator {validator_id}");
let context = PapyrusConsensusContext::new(
storage_reader.clone(),
consensus_channels.messages_to_broadcast_sender,
config.num_validators,
);
let start_height = config.start_height;
// TODO(matan): connect this to an actual channel.
let (_, sync_rx) = mpsc::channel(1);
match config.test {
Some(test_config) => {
let network_receiver = NetworkReceiver::new(
consensus_channels.broadcasted_messages_receiver,
test_config.cache_size,
test_config.random_seed,
test_config.drop_probability,
test_config.invalid_probability,
);
Ok(tokio::spawn(papyrus_consensus::run_consensus(
context,
start_height,
validator_id,
config.consensus_delay,
network_receiver,
sync_rx,
)))
}
None => Ok(tokio::spawn(papyrus_consensus::run_consensus(
let (Some(config), Some(network_manager)) = (config, maybe_network_manager) else {
info!("Consensus is disabled.");
return Ok(tokio::spawn(pending()));
};
debug!("Consensus config {:?}", config);

let network_channels =
network_manager.register_broadcast_topic(Topic::new(config.topic), BUFFER_SIZE)?;
if let Some(test_config) = config.test {
// TODO(matan): Move topic to config.
let sync_channels = network_manager
.register_broadcast_topic(Topic::new("consensus_test_sync".to_string()), BUFFER_SIZE)?;
let context = PapyrusConsensusContext::new(
storage_reader.clone(),
network_channels.messages_to_broadcast_sender,
config.num_validators,
Some(sync_channels.messages_to_broadcast_sender),
);
let sync_receiver =
sync_channels.broadcasted_messages_receiver.map(|(vote, _report_sender)| {
BlockNumber(vote.expect("Sync channel should never have errors").height)
});
let network_receiver = NetworkReceiver::new(
network_channels.broadcasted_messages_receiver,
test_config.cache_size,
test_config.random_seed,
test_config.drop_probability,
test_config.invalid_probability,
);
Ok(tokio::spawn(papyrus_consensus::run_consensus(
context,
start_height,
validator_id,
config.start_height,
config.validator_id,
config.consensus_delay,
consensus_channels.broadcasted_messages_receiver,
sync_rx,
))),
network_receiver,
sync_receiver,
)))
} else {
let context = PapyrusConsensusContext::new(
storage_reader.clone(),
network_channels.messages_to_broadcast_sender,
config.num_validators,
None,
);
Ok(tokio::spawn(papyrus_consensus::run_consensus(
context,
config.start_height,
config.validator_id,
config.consensus_delay,
network_channels.broadcasted_messages_receiver,
futures::stream::pending(),
)))
}
}

Expand All @@ -152,13 +165,11 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> {

// P2P network.
let (
network_future,
mut maybe_network_manager,
maybe_sync_client_channels,
maybe_sync_server_channels,
maybe_consensus_channels,
local_peer_id,
) = run_network(config.network.clone(), config.consensus.clone())?;
let network_handle = tokio::spawn(network_future);
) = run_network(config.network.clone())?;

// Monitoring server.
let monitoring_server = MonitoringServer::new(
Expand Down Expand Up @@ -236,16 +247,15 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> {
let sync_handle = tokio::spawn(sync_future);
let p2p_sync_client_handle = tokio::spawn(p2p_sync_client_future);

let consensus_handle = if let Some(consensus_channels) = maybe_consensus_channels {
run_consensus(
config.consensus.expect("If consensus_channels is Some, consensus must be Some too."),
storage_reader.clone(),
consensus_channels,
)?
} else {
tokio::spawn(pending())
};
let consensus_handle =
run_consensus(config.consensus, storage_reader.clone(), maybe_network_manager.as_mut())?;

let network_handle = tokio::spawn(async move {
match maybe_network_manager {
Some(manager) => manager.run().boxed().await,
None => pending().boxed().await,
}
});
tokio::select! {
res = storage_metrics_handle => {
error!("collecting storage metrics stopped.");
Expand Down Expand Up @@ -329,20 +339,12 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> {
}
}

type NetworkRunReturn = (
BoxFuture<'static, Result<(), NetworkError>>,
Option<P2PSyncClientChannels>,
Option<P2PSyncServerChannels>,
Option<BroadcastSubscriberChannels<ConsensusMessage>>,
String,
);

fn run_network(
network_config: Option<NetworkConfig>,
consensus_config: Option<ConsensusConfig>,
) -> anyhow::Result<NetworkRunReturn> {
type NetworkRunReturn =
(Option<NetworkManager>, Option<P2PSyncClientChannels>, Option<P2PSyncServerChannels>, String);

fn run_network(network_config: Option<NetworkConfig>) -> anyhow::Result<NetworkRunReturn> {
let Some(network_config) = network_config else {
return Ok((pending().boxed(), None, None, None, "".to_string()));
return Ok((None, None, None, "".to_string()));
};
let mut network_manager = network_manager::NetworkManager::new(network_config.clone());
let local_peer_id = network_manager.get_local_peer_id();
Expand All @@ -364,13 +366,6 @@ fn run_network(
let event_server_channel =
network_manager.register_sqmr_protocol_server(Protocol::Event.into(), BUFFER_SIZE);

let consensus_channels = match consensus_config {
Some(consensus_config) => Some(
network_manager
.register_broadcast_topic(Topic::new(consensus_config.topic), BUFFER_SIZE)?,
),
None => None,
};
let p2p_sync_client_channels = P2PSyncClientChannels::new(
header_client_sender,
state_diff_client_sender,
Expand All @@ -385,10 +380,9 @@ fn run_network(
);

Ok((
network_manager.run().boxed(),
Some(network_manager),
Some(p2p_sync_client_channels),
Some(p2p_sync_server_channels),
consensus_channels,
local_peer_id,
))
}
Expand Down
2 changes: 2 additions & 0 deletions crates/papyrus_protobuf/src/converters/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ impl From<Vote> for protobuf::Vote {
}
}

auto_impl_into_and_try_from_vec_u8!(Vote, protobuf::Vote);

impl TryFrom<protobuf::ConsensusMessage> for ConsensusMessage {
type Error = ProtobufConversionError;

Expand Down
4 changes: 4 additions & 0 deletions crates/sequencing/papyrus_consensus/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::time::Duration;

use futures::channel::{mpsc, oneshot};
use futures::{Stream, StreamExt};
use papyrus_common::metrics::PAPYRUS_CONSENSUS_HEIGHT;
use papyrus_network::network_manager::ReportSender;
use papyrus_protobuf::consensus::{ConsensusMessage, Proposal};
use papyrus_protobuf::converters::ProtobufConversionError;
Expand Down Expand Up @@ -45,11 +46,14 @@ where
ProposalWrapper:
Into<(ProposalInit, mpsc::Receiver<BlockT::ProposalChunk>, oneshot::Receiver<BlockHash>)>,
{
info!("Running consensus");
// Add a short delay to allow peers to connect and avoid "InsufficientPeers" error
tokio::time::sleep(consensus_delay).await;
let mut current_height = start_height;
let mut manager = MultiHeightManager::new();
loop {
metrics::gauge!(PAPYRUS_CONSENSUS_HEIGHT, current_height.0 as f64);

let run_height =
manager.run_height(&mut context, current_height, validator_id, &mut network_receiver);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use async_trait::async_trait;
use futures::channel::{mpsc, oneshot};
use futures::sink::SinkExt;
use futures::StreamExt;
use papyrus_common::metrics::PAPYRUS_CONSENSUS_HEIGHT;
use papyrus_network::network_manager::BroadcastSubscriberSender;
use papyrus_protobuf::consensus::{ConsensusMessage, Proposal, Vote};
use papyrus_storage::body::BodyStorageReader;
Expand All @@ -18,7 +17,7 @@ use papyrus_storage::{StorageError, StorageReader};
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_api::core::ContractAddress;
use starknet_api::transaction::Transaction;
use tracing::{debug, debug_span, info, Instrument};
use tracing::{debug, debug_span, info, warn, Instrument};

use crate::types::{ConsensusBlock, ConsensusContext, ConsensusError, ProposalInit, ValidatorId};
use crate::ProposalWrapper;
Expand Down Expand Up @@ -46,22 +45,25 @@ impl ConsensusBlock for PapyrusConsensusBlock {

pub struct PapyrusConsensusContext {
storage_reader: StorageReader,
broadcast_sender: BroadcastSubscriberSender<ConsensusMessage>,
network_broadcast_sender: BroadcastSubscriberSender<ConsensusMessage>,
validators: Vec<ValidatorId>,
sync_broadcast_sender: Option<BroadcastSubscriberSender<Vote>>,
}

impl PapyrusConsensusContext {
// TODO(dvir): remove the dead code attribute after we will use this function.
#[allow(dead_code)]
pub fn new(
storage_reader: StorageReader,
broadcast_sender: BroadcastSubscriberSender<ConsensusMessage>,
network_broadcast_sender: BroadcastSubscriberSender<ConsensusMessage>,
num_validators: u64,
sync_broadcast_sender: Option<BroadcastSubscriberSender<Vote>>,
) -> Self {
Self {
storage_reader,
broadcast_sender,
network_broadcast_sender,
validators: (0..num_validators).map(ContractAddress::from).collect(),
sync_broadcast_sender,
}
}
}
Expand Down Expand Up @@ -161,9 +163,13 @@ impl ConsensusContext for PapyrusConsensusContext {
panic!("Block in {height} was not found in storage despite waiting for it")
})
.block_hash;

// This can happen as a result of sync interrupting `run_height`.
fin_sender
.send(PapyrusConsensusBlock { content: transactions, id: block_hash })
.expect("Send should succeed");
.unwrap_or_else(|_| {
warn!("Failed to send block to consensus. height={height}");
})
}
.instrument(debug_span!("consensus_validate_proposal")),
);
Expand All @@ -181,7 +187,7 @@ impl ConsensusContext for PapyrusConsensusContext {

async fn broadcast(&mut self, message: ConsensusMessage) -> Result<(), ConsensusError> {
debug!("Broadcasting message: {message:?}");
self.broadcast_sender.send(message).await?;
self.network_broadcast_sender.send(message).await?;
Ok(())
}

Expand All @@ -191,7 +197,7 @@ impl ConsensusContext for PapyrusConsensusContext {
mut content_receiver: mpsc::Receiver<Transaction>,
fin_receiver: oneshot::Receiver<BlockHash>,
) -> Result<(), ConsensusError> {
let mut broadcast_sender = self.broadcast_sender.clone();
let mut broadcast_sender = self.network_broadcast_sender.clone();

tokio::spawn(
async move {
Expand Down Expand Up @@ -237,7 +243,9 @@ impl ConsensusContext for PapyrusConsensusContext {
"Finished consensus for height: {height}. Agreed on block with id: {:x}",
block.id().0
);
metrics::gauge!(PAPYRUS_CONSENSUS_HEIGHT, height as f64);
if let Some(sender) = &self.sync_broadcast_sender {
sender.clone().send(precommits[0].clone()).await?;
}
Ok(())
}
}
Expand Down
Loading
Loading