Skip to content

Commit

Permalink
feat: integrate proposal part into consensus and context
Browse files Browse the repository at this point in the history
  • Loading branch information
guy-starkware committed Nov 18, 2024
1 parent 48cad7e commit fc65213
Show file tree
Hide file tree
Showing 20 changed files with 674 additions and 271 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/papyrus_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ papyrus_consensus_orchestrator.workspace = true
papyrus_monitoring_gateway.workspace = true
papyrus_network.workspace = true
papyrus_p2p_sync.workspace = true
papyrus_protobuf.workspace = true
papyrus_rpc = { workspace = true, optional = true }
papyrus_storage.workspace = true
papyrus_sync.workspace = true
Expand Down
19 changes: 18 additions & 1 deletion crates/papyrus_node/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,16 @@ use papyrus_common::pending_classes::PendingClasses;
use papyrus_config::presentation::get_config_presentation;
use papyrus_config::validators::config_validate;
use papyrus_consensus::config::ConsensusConfig;
use papyrus_consensus::stream_handler::StreamHandler;
use papyrus_consensus_orchestrator::papyrus_consensus_context::PapyrusConsensusContext;
use papyrus_monitoring_gateway::MonitoringServer;
use papyrus_network::gossipsub_impl::Topic;
use papyrus_network::network_manager::NetworkManager;
use papyrus_network::network_manager::{BroadcastTopicChannels, NetworkManager};
use papyrus_network::{network_manager, NetworkConfig};
use papyrus_p2p_sync::client::{P2PSyncClient, P2PSyncClientChannels};
use papyrus_p2p_sync::server::{P2PSyncServer, P2PSyncServerChannels};
use papyrus_p2p_sync::{Protocol, BUFFER_SIZE};
use papyrus_protobuf::consensus::{ProposalPart, StreamMessage};
#[cfg(feature = "rpc")]
use papyrus_rpc::run_server;
use papyrus_storage::{open_storage, update_storage_metrics, StorageReader, StorageWriter};
Expand Down Expand Up @@ -49,6 +51,7 @@ const DEFAULT_LEVEL: LevelFilter = LevelFilter::INFO;
// different genesis hash.
// TODO: Consider moving to a more general place.
const GENESIS_HASH: &str = "0x0";
pub const NETWORK_TOPIC: &str = "consensus_proposals";

// TODO(dvir): add this to config.
// Duration between updates to the storage metrics (those in the collect_storage_metrics function).
Expand Down Expand Up @@ -185,12 +188,25 @@ fn spawn_consensus(

let network_channels = network_manager
.register_broadcast_topic(Topic::new(config.network_topic.clone()), BUFFER_SIZE)?;
let proposal_network_channels: BroadcastTopicChannels<StreamMessage<ProposalPart>> =
network_manager.register_broadcast_topic(Topic::new(NETWORK_TOPIC), BUFFER_SIZE)?;
let BroadcastTopicChannels {
broadcasted_messages_receiver: inbound_network_receiver,
broadcast_topic_client: outbound_network_sender,
} = proposal_network_channels;

let (outbound_internal_sender, inbound_internal_receiver) =
StreamHandler::get_channels(inbound_network_receiver, outbound_network_sender);

let context = PapyrusConsensusContext::new(
storage_reader.clone(),
network_channels.broadcast_topic_client.clone(),
// outbound_network_sender.clone(),
outbound_internal_sender,
config.num_validators,
None,
);

Ok(tokio::spawn(async move {
Ok(papyrus_consensus::run_consensus(
context,
Expand All @@ -199,6 +215,7 @@ fn spawn_consensus(
config.consensus_delay,
config.timeouts.clone(),
network_channels.into(),
inbound_internal_receiver,
futures::stream::pending(),
)
.await?)
Expand Down
80 changes: 25 additions & 55 deletions crates/papyrus_protobuf/src/consensus.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use futures::channel::{mpsc, oneshot};
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_api::core::ContractAddress;
use starknet_api::executable_transaction::Transaction as ExecutableTransaction;
use starknet_api::transaction::Transaction;

use crate::converters::ProtobufConversionError;
Expand Down Expand Up @@ -34,7 +32,7 @@ pub struct Vote {

#[derive(Debug, Clone, Hash, Eq, PartialEq)]
pub enum ConsensusMessage {
Proposal(Proposal),
Proposal(Proposal), // To be deprecated
Vote(Vote),
}

Expand Down Expand Up @@ -77,10 +75,10 @@ pub struct ProposalInit {
#[derive(Debug, Clone, PartialEq)]
pub struct TransactionBatch {
/// The transactions in the batch.
pub transactions: Vec<Transaction>,
pub transactions: Vec<Transaction>, // TODO(guyn): should be (Transaction, Hash)?
}

/// The propsal is done when receiving this fin message, which contains the block hash.
/// The proposal is done when receiving this fin message, which contains the block hash.
#[derive(Debug, Clone, PartialEq)]
pub struct ProposalFin {
/// The block hash of the proposed block.
Expand All @@ -99,6 +97,28 @@ pub enum ProposalPart {
Fin(ProposalFin),
}

impl TryInto<ProposalInit> for ProposalPart {
type Error = ProtobufConversionError;

fn try_into(self: ProposalPart) -> Result<ProposalInit, Self::Error> {
match self {
ProposalPart::Init(init) => Ok(init),
_ => Err(ProtobufConversionError::WrongEnumVariant {
type_description: "ProposalPart",
value_as_str: format!("{:?}", self),
expected: "Init",
got: "Transactions or Fin",
}),
}
}
}

impl From<ProposalInit> for ProposalPart {
fn from(value: ProposalInit) -> Self {
ProposalPart::Init(value)
}
}

impl<T> std::fmt::Display for StreamMessage<T>
where
T: Clone + Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError>,
Expand All @@ -123,53 +143,3 @@ where
}
}
}

// TODO(Guy): Remove after implementing broadcast streams.
#[allow(missing_docs)]
pub struct ProposalWrapper(pub Proposal);

impl From<ProposalWrapper>
for (ProposalInit, mpsc::Receiver<Transaction>, oneshot::Receiver<BlockHash>)
{
fn from(val: ProposalWrapper) -> Self {
let transactions: Vec<Transaction> = val.0.transactions.into_iter().collect();
let proposal_init = ProposalInit {
height: BlockNumber(val.0.height),
round: val.0.round,
proposer: val.0.proposer,
valid_round: val.0.valid_round,
};
let (mut content_sender, content_receiver) = mpsc::channel(transactions.len());
for tx in transactions {
content_sender.try_send(tx).expect("Send should succeed");
}
content_sender.close_channel();

let (fin_sender, fin_receiver) = oneshot::channel();
fin_sender.send(val.0.block_hash).expect("Send should succeed");

(proposal_init, content_receiver, fin_receiver)
}
}

impl From<ProposalWrapper>
for (ProposalInit, mpsc::Receiver<Vec<ExecutableTransaction>>, oneshot::Receiver<BlockHash>)
{
fn from(val: ProposalWrapper) -> Self {
let proposal_init = ProposalInit {
height: BlockNumber(val.0.height),
round: val.0.round,
proposer: val.0.proposer,
valid_round: val.0.valid_round,
};

let (_, content_receiver) = mpsc::channel(0);
// This should only be used for Milestone 1, and then removed once streaming is supported.
println!("Cannot build ExecutableTransaction from Transaction.");

let (fin_sender, fin_receiver) = oneshot::channel();
fin_sender.send(val.0.block_hash).expect("Send should succeed");

(proposal_init, content_receiver, fin_receiver)
}
}
1 change: 1 addition & 0 deletions crates/papyrus_protobuf/src/converters/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ impl From<ProposalPart> for protobuf::ProposalPart {

auto_impl_into_and_try_from_vec_u8!(ProposalPart, protobuf::ProposalPart);

// TODO(guyn): remove this once we are happy with how proposals are sent separate from votes.
impl TryFrom<protobuf::ConsensusMessage> for ConsensusMessage {
type Error = ProtobufConversionError;

Expand Down
7 changes: 7 additions & 0 deletions crates/papyrus_protobuf/src/converters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ pub enum ProtobufConversionError {
MissingField { field_description: &'static str },
#[error("Type `{type_description}` should be {num_expected} bytes but it got {value:?}.")]
BytesDataLengthMismatch { type_description: &'static str, num_expected: usize, value: Vec<u8> },
#[error("Type `{type_description}` got unexpected enum variant {value_as_str}")]
WrongEnumVariant {
type_description: &'static str,
value_as_str: String,
expected: &'static str,
got: &'static str,
},
#[error(transparent)]
DecodeError(#[from] DecodeError),
/// For CompressionError and serde_json::Error we put the string of the error instead of the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ syntax = "proto3";
import "p2p/proto/transaction.proto";
import "p2p/proto/common.proto";

// To be deprecated
message Proposal {
uint64 height = 1;
uint32 round = 2;
Expand Down
92 changes: 57 additions & 35 deletions crates/sequencing/papyrus_consensus/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ mod manager_test;
use std::collections::BTreeMap;
use std::time::Duration;

use futures::channel::{mpsc, oneshot};
use futures::channel::mpsc;
use futures::stream::FuturesUnordered;
use futures::{Stream, StreamExt};
use papyrus_common::metrics::{PAPYRUS_CONSENSUS_HEIGHT, PAPYRUS_CONSENSUS_SYNC_COUNT};
use papyrus_network::network_manager::BroadcastTopicClientTrait;
use papyrus_protobuf::consensus::{ConsensusMessage, ProposalInit, ProposalWrapper};
use starknet_api::block::{BlockHash, BlockNumber};
use papyrus_protobuf::consensus::{ConsensusMessage, ProposalInit};
use starknet_api::block::BlockNumber;
use tracing::{debug, info, instrument};

use crate::config::TimeoutsConfig;
Expand All @@ -37,13 +37,12 @@ pub async fn run_consensus<ContextT, SyncReceiverT>(
consensus_delay: Duration,
timeouts: TimeoutsConfig,
mut broadcast_channels: BroadcastConsensusMessageChannel,
mut inbound_proposal_receiver: mpsc::Receiver<mpsc::Receiver<ContextT::ProposalPart>>,
mut sync_receiver: SyncReceiverT,
) -> Result<(), ConsensusError>
where
ContextT: ConsensusContext,
ContextT: ConsensusContext + 'static,
SyncReceiverT: Stream<Item = BlockNumber> + Unpin,
ProposalWrapper:
Into<(ProposalInit, mpsc::Receiver<ContextT::ProposalChunk>, oneshot::Receiver<BlockHash>)>,
{
info!(
"Running consensus, start_height={}, validator_id={}, consensus_delay={}, timeouts={:?}",
Expand All @@ -57,11 +56,17 @@ where
tokio::time::sleep(consensus_delay).await;
let mut current_height = start_height;
let mut manager = MultiHeightManager::new(validator_id, timeouts);

#[allow(clippy::as_conversions)] // FIXME: use int metrics so `as f64` may be removed.
loop {
metrics::gauge!(PAPYRUS_CONSENSUS_HEIGHT, current_height.0 as f64);

let run_height = manager.run_height(&mut context, current_height, &mut broadcast_channels);
let run_height = manager.run_height(
&mut context,
current_height,
&mut broadcast_channels,
&mut inbound_proposal_receiver,
);

// `run_height` is not cancel safe. Our implementation doesn't enable us to start and stop
// it. We also cannot restart the height; when we dropped the future we dropped the state it
Expand Down Expand Up @@ -101,20 +106,13 @@ impl MultiHeightManager {
/// Assumes that `height` is monotonically increasing across calls for the sake of filtering
/// `cached_messaged`.
#[instrument(skip(self, context, broadcast_channels), level = "info")]
pub async fn run_height<ContextT>(
pub async fn run_height<ContextT: ConsensusContext>(
&mut self,
context: &mut ContextT,
height: BlockNumber,
broadcast_channels: &mut BroadcastConsensusMessageChannel,
) -> Result<Decision, ConsensusError>
where
ContextT: ConsensusContext,
ProposalWrapper: Into<(
ProposalInit,
mpsc::Receiver<ContextT::ProposalChunk>,
oneshot::Receiver<BlockHash>,
)>,
{
proposal_receiver: &mut mpsc::Receiver<mpsc::Receiver<ContextT::ProposalPart>>,
) -> Result<Decision, ConsensusError> {
let validators = context.validators(height).await;
info!("running consensus for height {height:?} with validator set {validators:?}");
let mut shc = SingleHeightConsensus::new(
Expand All @@ -140,6 +138,9 @@ impl MultiHeightManager {
message = next_message(&mut current_height_messages, broadcast_channels) => {
self.handle_message(context, height, &mut shc, message?).await?
},
Some(content_receiver) = proposal_receiver.next() => {
self.handle_proposal(context, height, &mut shc, content_receiver).await?
},
Some(shc_event) = shc_events.next() => {
shc.handle_event(context, shc_event).await?
},
Expand All @@ -156,22 +157,41 @@ impl MultiHeightManager {
}
}

// Handle a new proposal receiver from the network.
async fn handle_proposal<ContextT: ConsensusContext>(
&mut self,
context: &mut ContextT,
height: BlockNumber,
shc: &mut SingleHeightConsensus,
// proposal_init: (BlockNumber, u32, ContractAddress, Option<u32>),
mut content_receiver: mpsc::Receiver<ContextT::ProposalPart>,
) -> Result<ShcReturn, ConsensusError> {
let Some(first_part) = content_receiver.next().await else {
return Err(ConsensusError::InternalNetworkError(
"Proposal receiver closed".to_string(),
));
};
let proposal_init: ProposalInit = first_part.into().try_into()?;

// TODO(guyn): what is the right thing to do if proposal's height doesn't match?
if proposal_init.height != height {
debug!("Received a proposal for a different height. {:?}", proposal_init);
// if message.height() > height.0 {
// self.cached_messages.entry(message.height()).or_default().push(message);
// }
return Ok(ShcReturn::Tasks(Vec::new()));
}
shc.handle_proposal(context, proposal_init.into(), content_receiver).await
}

// Handle a single consensus message.
async fn handle_message<ContextT>(
async fn handle_message<ContextT: ConsensusContext>(
&mut self,
context: &mut ContextT,
height: BlockNumber,
shc: &mut SingleHeightConsensus,
message: ConsensusMessage,
) -> Result<ShcReturn, ConsensusError>
where
ContextT: ConsensusContext,
ProposalWrapper: Into<(
ProposalInit,
mpsc::Receiver<ContextT::ProposalChunk>,
oneshot::Receiver<BlockHash>,
)>,
{
) -> Result<ShcReturn, ConsensusError> {
// TODO(matan): We need to figure out an actual cacheing strategy under 2 constraints:
// 1. Malicious - must be capped so a malicious peer can't DoS us.
// 2. Parallel proposals - we may send/receive a proposal for (H+1, 0).
Expand All @@ -184,14 +204,16 @@ impl MultiHeightManager {
return Ok(ShcReturn::Tasks(Vec::new()));
}
match message {
ConsensusMessage::Proposal(proposal) => {
// Special case due to fake streaming.
let (proposal_init, content_receiver, fin_receiver) =
ProposalWrapper(proposal).into();
let res = shc
.handle_proposal(context, proposal_init, content_receiver, fin_receiver)
.await?;
Ok(res)
ConsensusMessage::Proposal(_proposal) => {
// Special case due to fake streaming. TODO(guyn): We can eliminate this option and
// leave handle_message. let (proposal_init, content_receiver,
// fin_receiver) = ProposalWrapper(proposal).into();
// let res = shc
// .handle_proposal(context, proposal_init.into(), content_receiver,
// fin_receiver) .await?;
Err(ConsensusError::InternalNetworkError(
"Proposal variant of ConsensusMessage no longer supported".to_string(),
))
}
_ => {
let res = shc.handle_message(context, message).await?;
Expand Down
Loading

0 comments on commit fc65213

Please sign in to comment.