Skip to content

Commit

Permalink
feat: allow a streamed proposal channel on top of existing one
Browse files Browse the repository at this point in the history
  • Loading branch information
guy-starkware committed Nov 25, 2024
1 parent 2ff3249 commit e7feca7
Show file tree
Hide file tree
Showing 23 changed files with 455 additions and 115 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/papyrus_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ papyrus_consensus_orchestrator.workspace = true
papyrus_monitoring_gateway.workspace = true
papyrus_network.workspace = true
papyrus_p2p_sync.workspace = true
papyrus_protobuf.workspace = true
papyrus_rpc = { workspace = true, optional = true }
papyrus_storage.workspace = true
papyrus_sync.workspace = true
Expand Down
19 changes: 18 additions & 1 deletion crates/papyrus_node/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,16 @@ use papyrus_common::pending_classes::PendingClasses;
use papyrus_config::presentation::get_config_presentation;
use papyrus_config::validators::config_validate;
use papyrus_consensus::config::ConsensusConfig;
use papyrus_consensus::stream_handler::StreamHandler;
use papyrus_consensus_orchestrator::papyrus_consensus_context::PapyrusConsensusContext;
use papyrus_monitoring_gateway::MonitoringServer;
use papyrus_network::gossipsub_impl::Topic;
use papyrus_network::network_manager::NetworkManager;
use papyrus_network::network_manager::{BroadcastTopicChannels, NetworkManager};
use papyrus_network::{network_manager, NetworkConfig};
use papyrus_p2p_sync::client::{P2PSyncClient, P2PSyncClientChannels};
use papyrus_p2p_sync::server::{P2PSyncServer, P2PSyncServerChannels};
use papyrus_p2p_sync::{Protocol, BUFFER_SIZE};
use papyrus_protobuf::consensus::{ProposalPart, StreamMessage};
#[cfg(feature = "rpc")]
use papyrus_rpc::run_server;
use papyrus_storage::{open_storage, update_storage_metrics, StorageReader, StorageWriter};
Expand Down Expand Up @@ -49,6 +51,7 @@ const DEFAULT_LEVEL: LevelFilter = LevelFilter::INFO;
// different genesis hash.
// TODO: Consider moving to a more general place.
const GENESIS_HASH: &str = "0x0";
pub const NETWORK_TOPIC: &str = "consensus_proposals";

// TODO(dvir): add this to config.
// Duration between updates to the storage metrics (those in the collect_storage_metrics function).
Expand Down Expand Up @@ -185,12 +188,25 @@ fn spawn_consensus(

let network_channels = network_manager
.register_broadcast_topic(Topic::new(config.network_topic.clone()), BUFFER_SIZE)?;
let proposal_network_channels: BroadcastTopicChannels<StreamMessage<ProposalPart>> =
network_manager.register_broadcast_topic(Topic::new(NETWORK_TOPIC), BUFFER_SIZE)?;
let BroadcastTopicChannels {
broadcasted_messages_receiver: inbound_network_receiver,
broadcast_topic_client: outbound_network_sender,
} = proposal_network_channels;

let (outbound_internal_sender, inbound_internal_receiver) =
StreamHandler::get_channels(inbound_network_receiver, outbound_network_sender);

let context = PapyrusConsensusContext::new(
storage_reader.clone(),
network_channels.broadcast_topic_client.clone(),
// outbound_network_sender.clone(),
outbound_internal_sender,
config.num_validators,
None,
);

Ok(tokio::spawn(async move {
Ok(papyrus_consensus::run_consensus(
context,
Expand All @@ -199,6 +215,7 @@ fn spawn_consensus(
config.consensus_delay,
config.timeouts.clone(),
network_channels.into(),
inbound_internal_receiver,
futures::stream::pending(),
)
.await?)
Expand Down
132 changes: 79 additions & 53 deletions crates/papyrus_protobuf/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ use futures::channel::{mpsc, oneshot};
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_api::core::ContractAddress;
use starknet_api::executable_transaction::Transaction as ExecutableTransaction;
use starknet_api::transaction::Transaction;
use starknet_api::transaction::{Transaction, TransactionHash};

use crate::converters::ProtobufConversionError;

// TODO(guyn): remove this once we integrate ProposalPart everywhere.
#[derive(Debug, Default, Hash, Clone, Eq, PartialEq)]
pub struct Proposal {
pub height: u64,
Expand Down Expand Up @@ -34,7 +35,7 @@ pub struct Vote {

#[derive(Debug, Clone, Hash, Eq, PartialEq)]
pub enum ConsensusMessage {
Proposal(Proposal),
Proposal(Proposal), // To be deprecated
Vote(Vote),
}

Expand All @@ -60,6 +61,56 @@ pub struct StreamMessage<T: Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufCon
pub message_id: u64,
}

// TODO(Guy): Remove after implementing broadcast streams.
#[allow(missing_docs)]
pub struct ProposalWrapper(pub Proposal);

impl From<ProposalWrapper>
for (ProposalInit, mpsc::Receiver<Transaction>, oneshot::Receiver<BlockHash>)
{
fn from(val: ProposalWrapper) -> Self {
let transactions: Vec<Transaction> = val.0.transactions.into_iter().collect();
let proposal_init = ProposalInit {
height: BlockNumber(val.0.height),
round: val.0.round,
proposer: val.0.proposer,
valid_round: val.0.valid_round,
};
let (mut content_sender, content_receiver) = mpsc::channel(transactions.len());
for tx in transactions {
content_sender.try_send(tx).expect("Send should succeed");
}
content_sender.close_channel();

let (fin_sender, fin_receiver) = oneshot::channel();
fin_sender.send(val.0.block_hash).expect("Send should succeed");

(proposal_init, content_receiver, fin_receiver)
}
}

impl From<ProposalWrapper>
for (ProposalInit, mpsc::Receiver<Vec<ExecutableTransaction>>, oneshot::Receiver<BlockHash>)
{
fn from(val: ProposalWrapper) -> Self {
let proposal_init = ProposalInit {
height: BlockNumber(val.0.height),
round: val.0.round,
proposer: val.0.proposer,
valid_round: val.0.valid_round,
};

let (_, content_receiver) = mpsc::channel(0);
// This should only be used for Milestone 1, and then removed once streaming is supported.
println!("Cannot build ExecutableTransaction from Transaction.");

let (fin_sender, fin_receiver) = oneshot::channel();
fin_sender.send(val.0.block_hash).expect("Send should succeed");

(proposal_init, content_receiver, fin_receiver)
}
}

/// This message must be sent first when proposing a new block.
#[derive(Default, Debug, Clone, PartialEq)]
pub struct ProposalInit {
Expand All @@ -78,9 +129,12 @@ pub struct ProposalInit {
pub struct TransactionBatch {
/// The transactions in the batch.
pub transactions: Vec<Transaction>,
// TODO(guyn): remove this once we know how to get hashes as part of the compilation.
/// The transaction's hashes.
pub tx_hashes: Vec<TransactionHash>,
}

/// The propsal is done when receiving this fin message, which contains the block hash.
/// The proposal is done when receiving this fin message, which contains the block hash.
#[derive(Debug, Clone, PartialEq)]
pub struct ProposalFin {
/// The block hash of the proposed block.
Expand All @@ -99,6 +153,28 @@ pub enum ProposalPart {
Fin(ProposalFin),
}

impl TryInto<ProposalInit> for ProposalPart {
type Error = ProtobufConversionError;

fn try_into(self: ProposalPart) -> Result<ProposalInit, Self::Error> {
match self {
ProposalPart::Init(init) => Ok(init),
_ => Err(ProtobufConversionError::WrongEnumVariant {
type_description: "ProposalPart",
value_as_str: format!("{:?}", self),
expected: "Init",
got: "Transactions or Fin",
}),
}
}
}

impl From<ProposalInit> for ProposalPart {
fn from(value: ProposalInit) -> Self {
ProposalPart::Init(value)
}
}

impl<T> std::fmt::Display for StreamMessage<T>
where
T: Clone + Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError>,
Expand All @@ -123,53 +199,3 @@ where
}
}
}

// TODO(Guy): Remove after implementing broadcast streams.
#[allow(missing_docs)]
pub struct ProposalWrapper(pub Proposal);

impl From<ProposalWrapper>
for (ProposalInit, mpsc::Receiver<Transaction>, oneshot::Receiver<BlockHash>)
{
fn from(val: ProposalWrapper) -> Self {
let transactions: Vec<Transaction> = val.0.transactions.into_iter().collect();
let proposal_init = ProposalInit {
height: BlockNumber(val.0.height),
round: val.0.round,
proposer: val.0.proposer,
valid_round: val.0.valid_round,
};
let (mut content_sender, content_receiver) = mpsc::channel(transactions.len());
for tx in transactions {
content_sender.try_send(tx).expect("Send should succeed");
}
content_sender.close_channel();

let (fin_sender, fin_receiver) = oneshot::channel();
fin_sender.send(val.0.block_hash).expect("Send should succeed");

(proposal_init, content_receiver, fin_receiver)
}
}

impl From<ProposalWrapper>
for (ProposalInit, mpsc::Receiver<Vec<ExecutableTransaction>>, oneshot::Receiver<BlockHash>)
{
fn from(val: ProposalWrapper) -> Self {
let proposal_init = ProposalInit {
height: BlockNumber(val.0.height),
round: val.0.round,
proposer: val.0.proposer,
valid_round: val.0.valid_round,
};

let (_, content_receiver) = mpsc::channel(0);
// This should only be used for Milestone 1, and then removed once streaming is supported.
println!("Cannot build ExecutableTransaction from Transaction.");

let (fin_sender, fin_receiver) = oneshot::channel();
fin_sender.send(val.0.block_hash).expect("Send should succeed");

(proposal_init, content_receiver, fin_receiver)
}
}
17 changes: 14 additions & 3 deletions crates/papyrus_protobuf/src/converters/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use std::convert::{TryFrom, TryInto};
use prost::Message;
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_api::hash::StarkHash;
use starknet_api::transaction::Transaction;
use starknet_api::transaction::{Transaction, TransactionHash};
use starknet_types_core::felt::Felt;

use crate::consensus::{
ConsensusMessage,
Expand Down Expand Up @@ -226,6 +227,8 @@ impl From<ProposalInit> for protobuf::ProposalInit {

auto_impl_into_and_try_from_vec_u8!(ProposalInit, protobuf::ProposalInit);

// TODO(guyn): remove tx_hashes once we know how to compile the hashes
// when making the executable transactions.
impl TryFrom<protobuf::TransactionBatch> for TransactionBatch {
type Error = ProtobufConversionError;
fn try_from(value: protobuf::TransactionBatch) -> Result<Self, Self::Error> {
Expand All @@ -234,14 +237,21 @@ impl TryFrom<protobuf::TransactionBatch> for TransactionBatch {
.into_iter()
.map(|tx| tx.try_into())
.collect::<Result<Vec<Transaction>, ProtobufConversionError>>()?;
Ok(TransactionBatch { transactions })
let tx_felts = value
.tx_hashes
.into_iter()
.map(|hash| hash.try_into())
.collect::<Result<Vec<Felt>, ProtobufConversionError>>()?;
let tx_hashes = tx_felts.into_iter().map(TransactionHash).collect();
Ok(TransactionBatch { transactions, tx_hashes })
}
}

impl From<TransactionBatch> for protobuf::TransactionBatch {
fn from(value: TransactionBatch) -> Self {
let transactions = value.transactions.into_iter().map(Into::into).collect();
protobuf::TransactionBatch { transactions }
let tx_hashes = value.tx_hashes.into_iter().map(|hash| hash.0.into()).collect();
protobuf::TransactionBatch { transactions, tx_hashes }
}
}

Expand Down Expand Up @@ -304,6 +314,7 @@ impl From<ProposalPart> for protobuf::ProposalPart {

auto_impl_into_and_try_from_vec_u8!(ProposalPart, protobuf::ProposalPart);

// TODO(guyn): remove this once we are happy with how proposals are sent separate from votes.
impl TryFrom<protobuf::ConsensusMessage> for ConsensusMessage {
type Error = ProtobufConversionError;

Expand Down
7 changes: 7 additions & 0 deletions crates/papyrus_protobuf/src/converters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ pub enum ProtobufConversionError {
MissingField { field_description: &'static str },
#[error("Type `{type_description}` should be {num_expected} bytes but it got {value:?}.")]
BytesDataLengthMismatch { type_description: &'static str, num_expected: usize, value: Vec<u8> },
#[error("Type `{type_description}` got unexpected enum variant {value_as_str}")]
WrongEnumVariant {
type_description: &'static str,
value_as_str: String,
expected: &'static str,
got: &'static str,
},
#[error(transparent)]
DecodeError(#[from] DecodeError),
/// For CompressionError and serde_json::Error we put the string of the error instead of the
Expand Down
3 changes: 2 additions & 1 deletion crates/papyrus_protobuf/src/converters/test_instances.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use papyrus_test_utils::{auto_impl_get_test_instance, get_number_of_variants, Ge
use rand::Rng;
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_api::core::ContractAddress;
use starknet_api::transaction::Transaction;
use starknet_api::transaction::{Transaction, TransactionHash};

use crate::consensus::{
ConsensusMessage,
Expand Down Expand Up @@ -52,6 +52,7 @@ auto_impl_get_test_instance! {
}
pub struct TransactionBatch {
pub transactions: Vec<Transaction>,
pub tx_hashes: Vec<TransactionHash>,
}
pub enum ProposalPart {
Init(ProposalInit) = 0,
Expand Down
3 changes: 3 additions & 0 deletions crates/papyrus_protobuf/src/proto/p2p/proto/consensus.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ syntax = "proto3";
import "p2p/proto/transaction.proto";
import "p2p/proto/common.proto";

// To be deprecated
message Proposal {
uint64 height = 1;
uint32 round = 2;
Expand Down Expand Up @@ -54,6 +55,8 @@ message ProposalInit {

message TransactionBatch {
repeated Transaction transactions = 1;
// TODO(guyn): remove this once we know how to calculate hashes
repeated Felt252 tx_hashes = 2;
}

message ProposalFin {
Expand Down
Loading

0 comments on commit e7feca7

Please sign in to comment.