Skip to content

Commit

Permalink
feat: integrate streaming with consensus proposals
Browse files Browse the repository at this point in the history
  • Loading branch information
guy-starkware committed Nov 12, 2024
1 parent 1097d48 commit c994b3b
Show file tree
Hide file tree
Showing 13 changed files with 407 additions and 198 deletions.
23 changes: 19 additions & 4 deletions crates/consensus_manager/src/consensus_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,17 @@ use futures::channel::mpsc::{self, SendError};
use futures::future::Ready;
use futures::SinkExt;
use libp2p::PeerId;
use papyrus_consensus::stream_handler::StreamHandler;
use papyrus_consensus::types::{BroadcastConsensusMessageChannel, ConsensusError};
use papyrus_consensus_orchestrator::sequencer_consensus_context::SequencerConsensusContext;
use papyrus_network::gossipsub_impl::Topic;
use papyrus_network::network_manager::{BroadcastTopicClient, NetworkManager};
use papyrus_network::network_manager::{
BroadcastTopicChannels,
BroadcastTopicClient,
NetworkManager,
};
use papyrus_network_types::network_types::BroadcastedMessageMetadata;
use papyrus_protobuf::consensus::{ConsensusMessage, ProposalPart};
use papyrus_protobuf::consensus::{ConsensusMessage, ProposalPart, StreamMessage};
use starknet_batcher_types::communication::SharedBatcherClient;
use starknet_sequencer_infra::component_definitions::ComponentStarter;
use starknet_sequencer_infra::errors::ComponentError;
Expand All @@ -38,14 +43,23 @@ impl ConsensusManager {
let mut network_manager =
NetworkManager::new(self.config.consensus_config.network_config.clone(), None);
let proposals_broadcast_channels = network_manager
.register_broadcast_topic::<ProposalPart>(
.register_broadcast_topic::<StreamMessage<ProposalPart>>(
Topic::new(NETWORK_TOPIC),
BROADCAST_BUFFER_SIZE,
)
.expect("Failed to register broadcast topic");
let BroadcastTopicChannels {
broadcasted_messages_receiver: inbound_network_receiver,
broadcast_topic_client: outbound_network_sender,
} = proposals_broadcast_channels;

let (outbound_internal_sender, inbound_internal_receiver) =
StreamHandler::get_channels(inbound_network_receiver, outbound_network_sender);

let context = SequencerConsensusContext::new(
Arc::clone(&self.batcher_client),
proposals_broadcast_channels.broadcast_topic_client.clone(),
outbound_internal_sender,
// proposals_broadcast_channels.broadcast_topic_client.clone(),
self.config.consensus_config.num_validators,
);

Expand All @@ -57,6 +71,7 @@ impl ConsensusManager {
self.config.consensus_config.consensus_delay,
self.config.consensus_config.timeouts.clone(),
create_fake_network_channels(),
inbound_internal_receiver,
futures::stream::pending(),
);

Expand Down
20 changes: 18 additions & 2 deletions crates/papyrus_node/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ use papyrus_common::pending_classes::PendingClasses;
use papyrus_config::presentation::get_config_presentation;
use papyrus_config::validators::config_validate;
use papyrus_consensus::config::ConsensusConfig;
use papyrus_consensus::stream_handler::StreamHandler;
use papyrus_consensus_orchestrator::papyrus_consensus_context::PapyrusConsensusContext;
use papyrus_monitoring_gateway::MonitoringServer;
use papyrus_network::gossipsub_impl::Topic;
use papyrus_network::network_manager::NetworkManager;
use papyrus_network::network_manager::{BroadcastTopicChannels, NetworkManager};
use papyrus_network::{network_manager, NetworkConfig};
use papyrus_p2p_sync::client::{P2PSyncClient, P2PSyncClientChannels};
use papyrus_p2p_sync::server::{P2PSyncServer, P2PSyncServerChannels};
Expand Down Expand Up @@ -49,6 +50,7 @@ const DEFAULT_LEVEL: LevelFilter = LevelFilter::INFO;
// different genesis hash.
// TODO: Consider moving to a more general place.
const GENESIS_HASH: &str = "0x0";
pub const NETWORK_TOPIC: &str = "consensus_proposals";

// TODO(dvir): add this to config.
// Duration between updates to the storage metrics (those in the collect_storage_metrics function).
Expand Down Expand Up @@ -185,12 +187,25 @@ fn spawn_consensus(

let network_channels = network_manager
.register_broadcast_topic(Topic::new(config.network_topic.clone()), BUFFER_SIZE)?;
let proposal_network_channels =
network_manager.register_broadcast_topic(Topic::new(NETWORK_TOPIC), BUFFER_SIZE)?;
let BroadcastTopicChannels {
broadcasted_messages_receiver: inbound_network_receiver,
broadcast_topic_client: outbound_network_sender,
} = proposal_network_channels;

let context = PapyrusConsensusContext::new(
storage_reader.clone(),
network_channels.broadcast_topic_client.clone(),
// outbound_internal_sender,
// network_channels.broadcast_topic_client.clone(),
outbound_network_sender.clone(),
config.num_validators,
None,
);

let (outbound_internal_sender, inbound_internal_receiver) =
StreamHandler::get_channels(inbound_network_receiver, outbound_network_sender);

Ok(tokio::spawn(async move {
Ok(papyrus_consensus::run_consensus(
context,
Expand All @@ -199,6 +214,7 @@ fn spawn_consensus(
config.consensus_delay,
config.timeouts.clone(),
network_channels.into(),
inbound_internal_receiver,
futures::stream::pending(),
)
.await?)
Expand Down
123 changes: 72 additions & 51 deletions crates/papyrus_protobuf/src/consensus.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use futures::channel::{mpsc, oneshot};
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_api::core::ContractAddress;
use starknet_api::executable_transaction::Transaction as ExecutableTransaction;
// use starknet_api::executable_transaction::Transaction as ExecutableTransaction;
use starknet_api::transaction::Transaction;

use crate::converters::ProtobufConversionError;
Expand Down Expand Up @@ -34,7 +33,7 @@ pub struct Vote {

#[derive(Debug, Clone, Hash, Eq, PartialEq)]
pub enum ConsensusMessage {
Proposal(Proposal),
Proposal(Proposal), // To be deprecated
Vote(Vote),
}

Expand Down Expand Up @@ -99,6 +98,28 @@ pub enum ProposalPart {
Fin(ProposalFin),
}

impl TryInto<ProposalInit> for ProposalPart {
type Error = ProtobufConversionError;

fn try_into(self: ProposalPart) -> Result<ProposalInit, Self::Error> {
match self {
ProposalPart::Init(init) => Ok(init),
_ => Err(ProtobufConversionError::WrongEnumVariant {
type_description: "ProposalPart",
value_as_str: format!("{:?}", self),
expected: "Init",
got: "Transactions or Fin",
}),
}
}
}

impl From<ProposalInit> for ProposalPart {
fn from(value: ProposalInit) -> Self {
ProposalPart::Init(value)
}
}

impl<T> std::fmt::Display for StreamMessage<T>
where
T: Clone + Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError>,
Expand All @@ -125,51 +146,51 @@ where
}

// TODO(Guy): Remove after implementing broadcast streams.
#[allow(missing_docs)]
pub struct ProposalWrapper(pub Proposal);

impl From<ProposalWrapper>
for (ProposalInit, mpsc::Receiver<Transaction>, oneshot::Receiver<BlockHash>)
{
fn from(val: ProposalWrapper) -> Self {
let transactions: Vec<Transaction> = val.0.transactions.into_iter().collect();
let proposal_init = ProposalInit {
height: BlockNumber(val.0.height),
round: val.0.round,
proposer: val.0.proposer,
valid_round: val.0.valid_round,
};
let (mut content_sender, content_receiver) = mpsc::channel(transactions.len());
for tx in transactions {
content_sender.try_send(tx).expect("Send should succeed");
}
content_sender.close_channel();

let (fin_sender, fin_receiver) = oneshot::channel();
fin_sender.send(val.0.block_hash).expect("Send should succeed");

(proposal_init, content_receiver, fin_receiver)
}
}

impl From<ProposalWrapper>
for (ProposalInit, mpsc::Receiver<Vec<ExecutableTransaction>>, oneshot::Receiver<BlockHash>)
{
fn from(val: ProposalWrapper) -> Self {
let proposal_init = ProposalInit {
height: BlockNumber(val.0.height),
round: val.0.round,
proposer: val.0.proposer,
valid_round: val.0.valid_round,
};

let (_, content_receiver) = mpsc::channel(0);
// This should only be used for Milestone 1, and then removed once streaming is supported.
println!("Cannot build ExecutableTransaction from Transaction.");

let (fin_sender, fin_receiver) = oneshot::channel();
fin_sender.send(val.0.block_hash).expect("Send should succeed");

(proposal_init, content_receiver, fin_receiver)
}
}
// #[allow(missing_docs)]
// pub struct ProposalWrapper(pub Proposal);

// impl From<ProposalWrapper>
// for (ProposalInit, mpsc::Receiver<Transaction>, oneshot::Receiver<BlockHash>)
// {
// fn from(val: ProposalWrapper) -> Self {
// let transactions: Vec<Transaction> = val.0.transactions.into_iter().collect();
// let proposal_init = ProposalInit {
// height: BlockNumber(val.0.height),
// round: val.0.round,
// proposer: val.0.proposer,
// valid_round: val.0.valid_round,
// };
// let (mut content_sender, content_receiver) = mpsc::channel(transactions.len());
// for tx in transactions {
// content_sender.try_send(tx).expect("Send should succeed");
// }
// content_sender.close_channel();

// let (fin_sender, fin_receiver) = oneshot::channel();
// fin_sender.send(val.0.block_hash).expect("Send should succeed");

// (proposal_init, content_receiver, fin_receiver)
// }
// }

// impl From<ProposalWrapper>
// for (ProposalInit, mpsc::Receiver<Vec<ExecutableTransaction>>, oneshot::Receiver<BlockHash>)
// {
// fn from(val: ProposalWrapper) -> Self {
// let proposal_init = ProposalInit {
// height: BlockNumber(val.0.height),
// round: val.0.round,
// proposer: val.0.proposer,
// valid_round: val.0.valid_round,
// };

// let (_, content_receiver) = mpsc::channel(0);
// // This should only be used for Milestone 1, and then removed once streaming is
// supported. println!("Cannot build ExecutableTransaction from Transaction.");

// let (fin_sender, fin_receiver) = oneshot::channel();
// fin_sender.send(val.0.block_hash).expect("Send should succeed");

// (proposal_init, content_receiver, fin_receiver)
// }
// }
7 changes: 7 additions & 0 deletions crates/papyrus_protobuf/src/converters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ pub enum ProtobufConversionError {
MissingField { field_description: &'static str },
#[error("Type `{type_description}` should be {num_expected} bytes but it got {value:?}.")]
BytesDataLengthMismatch { type_description: &'static str, num_expected: usize, value: Vec<u8> },
#[error("Type `{type_description}` got unexpected enum variant {value_as_str}")]
WrongEnumVariant {
type_description: &'static str,
value_as_str: String,
expected: &'static str,
got: &'static str,
},
#[error(transparent)]
DecodeError(#[from] DecodeError),
/// For CompressionError and serde_json::Error we put the string of the error instead of the
Expand Down
Loading

0 comments on commit c994b3b

Please sign in to comment.