From e7feca75166f1287a9b0f9826106daacc5dfab14 Mon Sep 17 00:00:00 2001 From: Guy Nir Date: Sun, 17 Nov 2024 15:22:44 +0200 Subject: [PATCH] feat: allow a streamed proposal channel on top of existing one --- Cargo.lock | 1 + crates/papyrus_node/Cargo.toml | 1 + crates/papyrus_node/src/run.rs | 19 ++- crates/papyrus_protobuf/src/consensus.rs | 132 +++++++++++------- .../src/converters/consensus.rs | 17 ++- crates/papyrus_protobuf/src/converters/mod.rs | 7 + .../src/converters/test_instances.rs | 3 +- .../src/proto/p2p/proto/consensus.proto | 3 + .../papyrus_consensus/src/manager.rs | 16 ++- .../papyrus_consensus/src/manager_test.rs | 52 ++++++- .../papyrus_consensus/src/stream_handler.rs | 42 +++++- .../src/stream_handler_test.rs | 2 +- .../papyrus_consensus/src/test_utils.rs | 10 +- .../sequencing/papyrus_consensus/src/types.rs | 8 +- .../src/papyrus_consensus_context.rs | 31 ++-- .../src/papyrus_consensus_context_test.rs | 22 ++- .../src/sequencer_consensus_context.rs | 7 +- .../src/sequencer_consensus_context_test.rs | 58 +++++++- crates/starknet_api/src/test_utils.rs | 18 ++- crates/starknet_api/src/transaction.rs | 39 ++++++ .../starknet_api/src/transaction_hash_test.rs | 20 +-- crates/starknet_api/src/transaction_test.rs | 28 ++++ .../src/consensus_manager.rs | 34 ++++- 23 files changed, 455 insertions(+), 115 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 481e0e0188..97074ca755 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7430,6 +7430,7 @@ dependencies = [ "papyrus_monitoring_gateway", "papyrus_network", "papyrus_p2p_sync", + "papyrus_protobuf", "papyrus_rpc", "papyrus_storage", "papyrus_sync", diff --git a/crates/papyrus_node/Cargo.toml b/crates/papyrus_node/Cargo.toml index 8f25b05355..cb290bfd6b 100644 --- a/crates/papyrus_node/Cargo.toml +++ b/crates/papyrus_node/Cargo.toml @@ -39,6 +39,7 @@ papyrus_consensus_orchestrator.workspace = true papyrus_monitoring_gateway.workspace = true papyrus_network.workspace = true papyrus_p2p_sync.workspace = true +papyrus_protobuf.workspace = true papyrus_rpc = { workspace = true, optional = true } papyrus_storage.workspace = true papyrus_sync.workspace = true diff --git a/crates/papyrus_node/src/run.rs b/crates/papyrus_node/src/run.rs index 2ddd329799..42f6f4dd98 100644 --- a/crates/papyrus_node/src/run.rs +++ b/crates/papyrus_node/src/run.rs @@ -13,14 +13,16 @@ use papyrus_common::pending_classes::PendingClasses; use papyrus_config::presentation::get_config_presentation; use papyrus_config::validators::config_validate; use papyrus_consensus::config::ConsensusConfig; +use papyrus_consensus::stream_handler::StreamHandler; use papyrus_consensus_orchestrator::papyrus_consensus_context::PapyrusConsensusContext; use papyrus_monitoring_gateway::MonitoringServer; use papyrus_network::gossipsub_impl::Topic; -use papyrus_network::network_manager::NetworkManager; +use papyrus_network::network_manager::{BroadcastTopicChannels, NetworkManager}; use papyrus_network::{network_manager, NetworkConfig}; use papyrus_p2p_sync::client::{P2PSyncClient, P2PSyncClientChannels}; use papyrus_p2p_sync::server::{P2PSyncServer, P2PSyncServerChannels}; use papyrus_p2p_sync::{Protocol, BUFFER_SIZE}; +use papyrus_protobuf::consensus::{ProposalPart, StreamMessage}; #[cfg(feature = "rpc")] use papyrus_rpc::run_server; use papyrus_storage::{open_storage, update_storage_metrics, StorageReader, StorageWriter}; @@ -49,6 +51,7 @@ const DEFAULT_LEVEL: LevelFilter = LevelFilter::INFO; // different genesis hash. // TODO: Consider moving to a more general place. const GENESIS_HASH: &str = "0x0"; +pub const NETWORK_TOPIC: &str = "consensus_proposals"; // TODO(dvir): add this to config. // Duration between updates to the storage metrics (those in the collect_storage_metrics function). @@ -185,12 +188,25 @@ fn spawn_consensus( let network_channels = network_manager .register_broadcast_topic(Topic::new(config.network_topic.clone()), BUFFER_SIZE)?; + let proposal_network_channels: BroadcastTopicChannels> = + network_manager.register_broadcast_topic(Topic::new(NETWORK_TOPIC), BUFFER_SIZE)?; + let BroadcastTopicChannels { + broadcasted_messages_receiver: inbound_network_receiver, + broadcast_topic_client: outbound_network_sender, + } = proposal_network_channels; + + let (outbound_internal_sender, inbound_internal_receiver) = + StreamHandler::get_channels(inbound_network_receiver, outbound_network_sender); + let context = PapyrusConsensusContext::new( storage_reader.clone(), network_channels.broadcast_topic_client.clone(), + // outbound_network_sender.clone(), + outbound_internal_sender, config.num_validators, None, ); + Ok(tokio::spawn(async move { Ok(papyrus_consensus::run_consensus( context, @@ -199,6 +215,7 @@ fn spawn_consensus( config.consensus_delay, config.timeouts.clone(), network_channels.into(), + inbound_internal_receiver, futures::stream::pending(), ) .await?) diff --git a/crates/papyrus_protobuf/src/consensus.rs b/crates/papyrus_protobuf/src/consensus.rs index d4156ce270..a2b2d740b6 100644 --- a/crates/papyrus_protobuf/src/consensus.rs +++ b/crates/papyrus_protobuf/src/consensus.rs @@ -2,10 +2,11 @@ use futures::channel::{mpsc, oneshot}; use starknet_api::block::{BlockHash, BlockNumber}; use starknet_api::core::ContractAddress; use starknet_api::executable_transaction::Transaction as ExecutableTransaction; -use starknet_api::transaction::Transaction; +use starknet_api::transaction::{Transaction, TransactionHash}; use crate::converters::ProtobufConversionError; +// TODO(guyn): remove this once we integrate ProposalPart everywhere. #[derive(Debug, Default, Hash, Clone, Eq, PartialEq)] pub struct Proposal { pub height: u64, @@ -34,7 +35,7 @@ pub struct Vote { #[derive(Debug, Clone, Hash, Eq, PartialEq)] pub enum ConsensusMessage { - Proposal(Proposal), + Proposal(Proposal), // To be deprecated Vote(Vote), } @@ -60,6 +61,56 @@ pub struct StreamMessage> + TryFrom, Error = ProtobufCon pub message_id: u64, } +// TODO(Guy): Remove after implementing broadcast streams. +#[allow(missing_docs)] +pub struct ProposalWrapper(pub Proposal); + +impl From + for (ProposalInit, mpsc::Receiver, oneshot::Receiver) +{ + fn from(val: ProposalWrapper) -> Self { + let transactions: Vec = val.0.transactions.into_iter().collect(); + let proposal_init = ProposalInit { + height: BlockNumber(val.0.height), + round: val.0.round, + proposer: val.0.proposer, + valid_round: val.0.valid_round, + }; + let (mut content_sender, content_receiver) = mpsc::channel(transactions.len()); + for tx in transactions { + content_sender.try_send(tx).expect("Send should succeed"); + } + content_sender.close_channel(); + + let (fin_sender, fin_receiver) = oneshot::channel(); + fin_sender.send(val.0.block_hash).expect("Send should succeed"); + + (proposal_init, content_receiver, fin_receiver) + } +} + +impl From + for (ProposalInit, mpsc::Receiver>, oneshot::Receiver) +{ + fn from(val: ProposalWrapper) -> Self { + let proposal_init = ProposalInit { + height: BlockNumber(val.0.height), + round: val.0.round, + proposer: val.0.proposer, + valid_round: val.0.valid_round, + }; + + let (_, content_receiver) = mpsc::channel(0); + // This should only be used for Milestone 1, and then removed once streaming is supported. + println!("Cannot build ExecutableTransaction from Transaction."); + + let (fin_sender, fin_receiver) = oneshot::channel(); + fin_sender.send(val.0.block_hash).expect("Send should succeed"); + + (proposal_init, content_receiver, fin_receiver) + } +} + /// This message must be sent first when proposing a new block. #[derive(Default, Debug, Clone, PartialEq)] pub struct ProposalInit { @@ -78,9 +129,12 @@ pub struct ProposalInit { pub struct TransactionBatch { /// The transactions in the batch. pub transactions: Vec, + // TODO(guyn): remove this once we know how to get hashes as part of the compilation. + /// The transaction's hashes. + pub tx_hashes: Vec, } -/// The propsal is done when receiving this fin message, which contains the block hash. +/// The proposal is done when receiving this fin message, which contains the block hash. #[derive(Debug, Clone, PartialEq)] pub struct ProposalFin { /// The block hash of the proposed block. @@ -99,6 +153,28 @@ pub enum ProposalPart { Fin(ProposalFin), } +impl TryInto for ProposalPart { + type Error = ProtobufConversionError; + + fn try_into(self: ProposalPart) -> Result { + match self { + ProposalPart::Init(init) => Ok(init), + _ => Err(ProtobufConversionError::WrongEnumVariant { + type_description: "ProposalPart", + value_as_str: format!("{:?}", self), + expected: "Init", + got: "Transactions or Fin", + }), + } + } +} + +impl From for ProposalPart { + fn from(value: ProposalInit) -> Self { + ProposalPart::Init(value) + } +} + impl std::fmt::Display for StreamMessage where T: Clone + Into> + TryFrom, Error = ProtobufConversionError>, @@ -123,53 +199,3 @@ where } } } - -// TODO(Guy): Remove after implementing broadcast streams. -#[allow(missing_docs)] -pub struct ProposalWrapper(pub Proposal); - -impl From - for (ProposalInit, mpsc::Receiver, oneshot::Receiver) -{ - fn from(val: ProposalWrapper) -> Self { - let transactions: Vec = val.0.transactions.into_iter().collect(); - let proposal_init = ProposalInit { - height: BlockNumber(val.0.height), - round: val.0.round, - proposer: val.0.proposer, - valid_round: val.0.valid_round, - }; - let (mut content_sender, content_receiver) = mpsc::channel(transactions.len()); - for tx in transactions { - content_sender.try_send(tx).expect("Send should succeed"); - } - content_sender.close_channel(); - - let (fin_sender, fin_receiver) = oneshot::channel(); - fin_sender.send(val.0.block_hash).expect("Send should succeed"); - - (proposal_init, content_receiver, fin_receiver) - } -} - -impl From - for (ProposalInit, mpsc::Receiver>, oneshot::Receiver) -{ - fn from(val: ProposalWrapper) -> Self { - let proposal_init = ProposalInit { - height: BlockNumber(val.0.height), - round: val.0.round, - proposer: val.0.proposer, - valid_round: val.0.valid_round, - }; - - let (_, content_receiver) = mpsc::channel(0); - // This should only be used for Milestone 1, and then removed once streaming is supported. - println!("Cannot build ExecutableTransaction from Transaction."); - - let (fin_sender, fin_receiver) = oneshot::channel(); - fin_sender.send(val.0.block_hash).expect("Send should succeed"); - - (proposal_init, content_receiver, fin_receiver) - } -} diff --git a/crates/papyrus_protobuf/src/converters/consensus.rs b/crates/papyrus_protobuf/src/converters/consensus.rs index ad80418e65..16adfac9b1 100644 --- a/crates/papyrus_protobuf/src/converters/consensus.rs +++ b/crates/papyrus_protobuf/src/converters/consensus.rs @@ -6,7 +6,8 @@ use std::convert::{TryFrom, TryInto}; use prost::Message; use starknet_api::block::{BlockHash, BlockNumber}; use starknet_api::hash::StarkHash; -use starknet_api::transaction::Transaction; +use starknet_api::transaction::{Transaction, TransactionHash}; +use starknet_types_core::felt::Felt; use crate::consensus::{ ConsensusMessage, @@ -226,6 +227,8 @@ impl From for protobuf::ProposalInit { auto_impl_into_and_try_from_vec_u8!(ProposalInit, protobuf::ProposalInit); +// TODO(guyn): remove tx_hashes once we know how to compile the hashes +// when making the executable transactions. impl TryFrom for TransactionBatch { type Error = ProtobufConversionError; fn try_from(value: protobuf::TransactionBatch) -> Result { @@ -234,14 +237,21 @@ impl TryFrom for TransactionBatch { .into_iter() .map(|tx| tx.try_into()) .collect::, ProtobufConversionError>>()?; - Ok(TransactionBatch { transactions }) + let tx_felts = value + .tx_hashes + .into_iter() + .map(|hash| hash.try_into()) + .collect::, ProtobufConversionError>>()?; + let tx_hashes = tx_felts.into_iter().map(TransactionHash).collect(); + Ok(TransactionBatch { transactions, tx_hashes }) } } impl From for protobuf::TransactionBatch { fn from(value: TransactionBatch) -> Self { let transactions = value.transactions.into_iter().map(Into::into).collect(); - protobuf::TransactionBatch { transactions } + let tx_hashes = value.tx_hashes.into_iter().map(|hash| hash.0.into()).collect(); + protobuf::TransactionBatch { transactions, tx_hashes } } } @@ -304,6 +314,7 @@ impl From for protobuf::ProposalPart { auto_impl_into_and_try_from_vec_u8!(ProposalPart, protobuf::ProposalPart); +// TODO(guyn): remove this once we are happy with how proposals are sent separate from votes. impl TryFrom for ConsensusMessage { type Error = ProtobufConversionError; diff --git a/crates/papyrus_protobuf/src/converters/mod.rs b/crates/papyrus_protobuf/src/converters/mod.rs index a72d774d25..c2a19f9d8d 100644 --- a/crates/papyrus_protobuf/src/converters/mod.rs +++ b/crates/papyrus_protobuf/src/converters/mod.rs @@ -22,6 +22,13 @@ pub enum ProtobufConversionError { MissingField { field_description: &'static str }, #[error("Type `{type_description}` should be {num_expected} bytes but it got {value:?}.")] BytesDataLengthMismatch { type_description: &'static str, num_expected: usize, value: Vec }, + #[error("Type `{type_description}` got unexpected enum variant {value_as_str}")] + WrongEnumVariant { + type_description: &'static str, + value_as_str: String, + expected: &'static str, + got: &'static str, + }, #[error(transparent)] DecodeError(#[from] DecodeError), /// For CompressionError and serde_json::Error we put the string of the error instead of the diff --git a/crates/papyrus_protobuf/src/converters/test_instances.rs b/crates/papyrus_protobuf/src/converters/test_instances.rs index 742349b973..01c9c1903a 100644 --- a/crates/papyrus_protobuf/src/converters/test_instances.rs +++ b/crates/papyrus_protobuf/src/converters/test_instances.rs @@ -2,7 +2,7 @@ use papyrus_test_utils::{auto_impl_get_test_instance, get_number_of_variants, Ge use rand::Rng; use starknet_api::block::{BlockHash, BlockNumber}; use starknet_api::core::ContractAddress; -use starknet_api::transaction::Transaction; +use starknet_api::transaction::{Transaction, TransactionHash}; use crate::consensus::{ ConsensusMessage, @@ -52,6 +52,7 @@ auto_impl_get_test_instance! { } pub struct TransactionBatch { pub transactions: Vec, + pub tx_hashes: Vec, } pub enum ProposalPart { Init(ProposalInit) = 0, diff --git a/crates/papyrus_protobuf/src/proto/p2p/proto/consensus.proto b/crates/papyrus_protobuf/src/proto/p2p/proto/consensus.proto index 8c31067f97..81e5af8d2c 100644 --- a/crates/papyrus_protobuf/src/proto/p2p/proto/consensus.proto +++ b/crates/papyrus_protobuf/src/proto/p2p/proto/consensus.proto @@ -2,6 +2,7 @@ syntax = "proto3"; import "p2p/proto/transaction.proto"; import "p2p/proto/common.proto"; +// To be deprecated message Proposal { uint64 height = 1; uint32 round = 2; @@ -54,6 +55,8 @@ message ProposalInit { message TransactionBatch { repeated Transaction transactions = 1; + // TODO(guyn): remove this once we know how to calculate hashes + repeated Felt252 tx_hashes = 2; } message ProposalFin { diff --git a/crates/sequencing/papyrus_consensus/src/manager.rs b/crates/sequencing/papyrus_consensus/src/manager.rs index 5dfa5c7ad5..fe5ad992da 100644 --- a/crates/sequencing/papyrus_consensus/src/manager.rs +++ b/crates/sequencing/papyrus_consensus/src/manager.rs @@ -37,13 +37,14 @@ pub async fn run_consensus( consensus_delay: Duration, timeouts: TimeoutsConfig, mut broadcast_channels: BroadcastConsensusMessageChannel, + mut inbound_proposal_receiver: mpsc::Receiver>, mut sync_receiver: SyncReceiverT, ) -> Result<(), ConsensusError> where ContextT: ConsensusContext, - SyncReceiverT: Stream + Unpin, ProposalWrapper: Into<(ProposalInit, mpsc::Receiver, oneshot::Receiver)>, + SyncReceiverT: Stream + Unpin, { info!( "Running consensus, start_height={}, validator_id={}, consensus_delay={}, timeouts={:?}", @@ -61,7 +62,12 @@ where loop { metrics::gauge!(PAPYRUS_CONSENSUS_HEIGHT, current_height.0 as f64); - let run_height = manager.run_height(&mut context, current_height, &mut broadcast_channels); + let run_height = manager.run_height( + &mut context, + current_height, + &mut broadcast_channels, + &mut inbound_proposal_receiver, + ); // `run_height` is not cancel safe. Our implementation doesn't enable us to start and stop // it. We also cannot restart the height; when we dropped the future we dropped the state it @@ -106,6 +112,7 @@ impl MultiHeightManager { context: &mut ContextT, height: BlockNumber, broadcast_channels: &mut BroadcastConsensusMessageChannel, + proposal_receiver: &mut mpsc::Receiver>, ) -> Result where ContextT: ConsensusContext, @@ -186,6 +193,7 @@ impl MultiHeightManager { match message { ConsensusMessage::Proposal(proposal) => { // Special case due to fake streaming. + // TODO(guyn): this will be gone once we integrate the proposal channels. let (proposal_init, content_receiver, fin_receiver) = ProposalWrapper(proposal).into(); let res = shc @@ -224,9 +232,7 @@ impl MultiHeightManager { async fn next_message( cached_messages: &mut Vec, broadcast_channels: &mut BroadcastConsensusMessageChannel, -) -> Result -where -{ +) -> Result { let BroadcastConsensusMessageChannel { broadcasted_messages_receiver, broadcast_topic_client } = broadcast_channels; if let Some(msg) = cached_messages.pop() { diff --git a/crates/sequencing/papyrus_consensus/src/manager_test.rs b/crates/sequencing/papyrus_consensus/src/manager_test.rs index 3f94d78fb4..374d35bd88 100644 --- a/crates/sequencing/papyrus_consensus/src/manager_test.rs +++ b/crates/sequencing/papyrus_consensus/src/manager_test.rs @@ -13,7 +13,7 @@ use papyrus_network::network_manager::test_utils::{ TestSubscriberChannels, }; use papyrus_network_types::network_types::BroadcastedMessageMetadata; -use papyrus_protobuf::consensus::{ConsensusMessage, ProposalInit, Vote}; +use papyrus_protobuf::consensus::{ConsensusMessage, ProposalInit, ProposalPart, Vote}; use papyrus_test_utils::{get_rng, GetTestInstance}; use starknet_api::block::{BlockHash, BlockNumber}; use starknet_api::transaction::Transaction; @@ -32,12 +32,15 @@ lazy_static! { static ref TIMEOUTS: TimeoutsConfig = TimeoutsConfig::default(); } +const CHANNEL_SIZE: usize = 10; + mock! { pub TestContext {} #[async_trait] impl ConsensusContext for TestContext { type ProposalChunk = Transaction; + type ProposalPart = ProposalPart; async fn build_proposal( &mut self, @@ -83,6 +86,11 @@ async fn manager_multiple_heights_unordered() { let TestSubscriberChannels { mock_network, subscriber_channels } = mock_register_broadcast_topic().unwrap(); let mut sender = mock_network.broadcasted_messages_sender; + + // TODO(guyn): refactor this test to pass proposals through the correct channels. + let (mut _proposal_receiver_sender, mut proposal_receiver_receiver) = + mpsc::channel(CHANNEL_SIZE); + // Send messages for height 2 followed by those for height 1. send(&mut sender, proposal(Felt::TWO, 2, 0, *PROPOSER_ID)).await; send(&mut sender, prevote(Some(Felt::TWO), 2, 0, *PROPOSER_ID)).await; @@ -107,8 +115,15 @@ async fn manager_multiple_heights_unordered() { let mut manager = MultiHeightManager::new(*VALIDATOR_ID, TIMEOUTS.clone()); let mut subscriber_channels = subscriber_channels.into(); - let decision = - manager.run_height(&mut context, BlockNumber(1), &mut subscriber_channels).await.unwrap(); + let decision = manager + .run_height( + &mut context, + BlockNumber(1), + &mut subscriber_channels, + &mut proposal_receiver_receiver, + ) + .await + .unwrap(); assert_eq!(decision.block, BlockHash(Felt::ONE)); // Run the manager for height 2. @@ -120,8 +135,15 @@ async fn manager_multiple_heights_unordered() { block_receiver }) .times(1); - let decision = - manager.run_height(&mut context, BlockNumber(2), &mut subscriber_channels).await.unwrap(); + let decision = manager + .run_height( + &mut context, + BlockNumber(2), + &mut subscriber_channels, + &mut proposal_receiver_receiver, + ) + .await + .unwrap(); assert_eq!(decision.block, BlockHash(Felt::TWO)); } @@ -131,6 +153,9 @@ async fn run_consensus_sync() { let mut context = MockTestContext::new(); let (decision_tx, decision_rx) = oneshot::channel(); + // TODO(guyn): refactor this test to pass proposals through the correct channels. + let (mut _proposal_receiver_sender, proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE); + context.expect_validate_proposal().return_once(move |_, _, _| { let (block_sender, block_receiver) = oneshot::channel(); block_sender.send(BlockHash(Felt::TWO)).unwrap(); @@ -164,6 +189,7 @@ async fn run_consensus_sync() { Duration::ZERO, TIMEOUTS.clone(), subscriber_channels.into(), + proposal_receiver_receiver, &mut sync_receiver, ) .await @@ -190,6 +216,9 @@ async fn run_consensus_sync_cancellation_safety() { let (proposal_handled_tx, proposal_handled_rx) = oneshot::channel(); let (decision_tx, decision_rx) = oneshot::channel(); + // TODO(guyn): refactor this test to pass proposals through the correct channels. + let (mut _proposal_receiver_sender, proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE); + context.expect_validate_proposal().return_once(move |_, _, _| { let (block_sender, block_receiver) = oneshot::channel(); block_sender.send(BlockHash(Felt::ONE)).unwrap(); @@ -223,6 +252,7 @@ async fn run_consensus_sync_cancellation_safety() { Duration::ZERO, TIMEOUTS.clone(), subscriber_channels.into(), + proposal_receiver_receiver, &mut sync_receiver, ) .await @@ -253,6 +283,11 @@ async fn test_timeouts() { let TestSubscriberChannels { mock_network, subscriber_channels } = mock_register_broadcast_topic().unwrap(); let mut sender = mock_network.broadcasted_messages_sender; + + // TODO(guyn): refactor this test to pass proposals through the correct channels. + let (mut _proposal_receiver_sender, mut proposal_receiver_receiver) = + mpsc::channel(CHANNEL_SIZE); + send(&mut sender, proposal(Felt::ONE, 1, 0, *PROPOSER_ID)).await; send(&mut sender, prevote(None, 1, 0, *VALIDATOR_ID_2)).await; send(&mut sender, prevote(None, 1, 0, *VALIDATOR_ID_3)).await; @@ -285,7 +320,12 @@ async fn test_timeouts() { let mut manager = MultiHeightManager::new(*VALIDATOR_ID, TIMEOUTS.clone()); let manager_handle = tokio::spawn(async move { let decision = manager - .run_height(&mut context, BlockNumber(1), &mut subscriber_channels.into()) + .run_height( + &mut context, + BlockNumber(1), + &mut subscriber_channels.into(), + &mut proposal_receiver_receiver, + ) .await .unwrap(); assert_eq!(decision.block, BlockHash(Felt::ONE)); diff --git a/crates/sequencing/papyrus_consensus/src/stream_handler.rs b/crates/sequencing/papyrus_consensus/src/stream_handler.rs index 0531ba48a6..a1ae9859ba 100644 --- a/crates/sequencing/papyrus_consensus/src/stream_handler.rs +++ b/crates/sequencing/papyrus_consensus/src/stream_handler.rs @@ -30,7 +30,9 @@ type StreamKey = (PeerId, StreamId); const CHANNEL_BUFFER_LENGTH: usize = 100; #[derive(Debug, Clone)] -struct StreamData> + TryFrom, Error = ProtobufConversionError>> { +struct StreamData< + T: Clone + Into> + TryFrom, Error = ProtobufConversionError> + 'static, +> { next_message_id: MessageId, // Last message ID. If None, it means we have not yet gotten to it. fin_message_id: Option, @@ -56,7 +58,7 @@ impl> + TryFrom, Error = ProtobufConversionError /// - Buffering inbound messages and reporting them to the application in order. /// - Sending outbound messages to the network, wrapped in StreamMessage. pub struct StreamHandler< - T: Clone + Into> + TryFrom, Error = ProtobufConversionError>, + T: Clone + Into> + TryFrom, Error = ProtobufConversionError> + 'static, > { // For each stream ID from the network, send the application a Receiver // that will receive the messages in order. This allows sending such Receivers. @@ -100,6 +102,42 @@ impl> + TryFrom, Error = ProtobufConversi } } + /// Create a new StreamHandler and start it running in a new task. + /// Gets network input/output channels and returns application input/output channels. + #[allow(clippy::type_complexity)] + pub fn get_channels( + inbound_network_receiver: BroadcastTopicServer>, + outbound_network_sender: BroadcastTopicClient>, + ) -> (mpsc::Sender<(StreamId, mpsc::Receiver)>, mpsc::Receiver>) { + // The inbound messages come into StreamHandler via inbound_network_receiver, + // and are forwarded to the consensus via inbound_internal_receiver + // (the StreamHandler keeps the inbound_internal_sender to pass messsage). + let (inbound_internal_sender, inbound_internal_receiver): ( + mpsc::Sender>, + mpsc::Receiver>, + ) = mpsc::channel(CHANNEL_BUFFER_LENGTH); + // The outbound messages that an application would like to send are: + // 1. Sent into outbound_internal_sender as tuples of (StreamId, Receiver) + // 2. Ingested by StreamHandler by its outbound_internal_receiver. + // 3. Broadcast by the StreamHandler using its outbound_network_sender. + let (outbound_internal_sender, outbound_internal_receiver): ( + mpsc::Sender<(StreamId, mpsc::Receiver)>, + mpsc::Receiver<(StreamId, mpsc::Receiver)>, + ) = mpsc::channel(CHANNEL_BUFFER_LENGTH); + + let mut stream_handler = StreamHandler::::new( + inbound_internal_sender, // Sender>, + inbound_network_receiver, // BroadcastTopicServer>, + outbound_internal_receiver, // Receiver<(StreamId, Receiver)>, + outbound_network_sender, // BroadcastTopicClient> + ); + tokio::spawn(async move { + stream_handler.run().await; + }); + + (outbound_internal_sender, inbound_internal_receiver) + } + /// Listen for messages coming from the network and from the application. /// - Outbound messages are wrapped as StreamMessage and sent to the network directly. /// - Inbound messages are stripped of StreamMessage and buffered until they can be sent in the diff --git a/crates/sequencing/papyrus_consensus/src/stream_handler_test.rs b/crates/sequencing/papyrus_consensus/src/stream_handler_test.rs index 0bd7250f12..4bd575da8d 100644 --- a/crates/sequencing/papyrus_consensus/src/stream_handler_test.rs +++ b/crates/sequencing/papyrus_consensus/src/stream_handler_test.rs @@ -83,7 +83,7 @@ mod tests { mock_register_broadcast_topic().unwrap(); let network_sender_to_inbound = mock_network.broadcasted_messages_sender; - // The inbound_receiver is given to StreamHandler to inbound to mock network messages. + // The inbound_receiver is given to StreamHandler to mock network messages. let BroadcastTopicChannels { broadcasted_messages_receiver: inbound_receiver, broadcast_topic_client: _, diff --git a/crates/sequencing/papyrus_consensus/src/test_utils.rs b/crates/sequencing/papyrus_consensus/src/test_utils.rs index 303e30a049..980f5ee976 100644 --- a/crates/sequencing/papyrus_consensus/src/test_utils.rs +++ b/crates/sequencing/papyrus_consensus/src/test_utils.rs @@ -3,7 +3,14 @@ use std::time::Duration; use async_trait::async_trait; use futures::channel::{mpsc, oneshot}; use mockall::mock; -use papyrus_protobuf::consensus::{ConsensusMessage, Proposal, ProposalInit, Vote, VoteType}; +use papyrus_protobuf::consensus::{ + ConsensusMessage, + Proposal, + ProposalInit, + ProposalPart, + Vote, + VoteType, +}; use starknet_api::block::{BlockHash, BlockNumber}; use starknet_types_core::felt::Felt; @@ -23,6 +30,7 @@ mock! { #[async_trait] impl ConsensusContext for TestContext { type ProposalChunk = u32; + type ProposalPart = ProposalPart; async fn build_proposal( &mut self, diff --git a/crates/sequencing/papyrus_consensus/src/types.rs b/crates/sequencing/papyrus_consensus/src/types.rs index a4fcd53f31..751cb4b4ed 100644 --- a/crates/sequencing/papyrus_consensus/src/types.rs +++ b/crates/sequencing/papyrus_consensus/src/types.rs @@ -29,7 +29,13 @@ pub trait ConsensusContext { /// The chunks of content returned when iterating the proposal. // In practice I expect this to match the type sent to the network // (papyrus_protobuf::ConsensusMessage), and not to be specific to just the block's content. - type ProposalChunk; + type ProposalChunk; // TODO(guyn): deprecate this (and replace by ProposalPart) + type ProposalPart: TryFrom, Error = ProtobufConversionError> + + Into> + + TryInto + + From + + Clone + + Send; // TODO(matan): The oneshot for receiving the build block could be generalized to just be some // future which returns a block. diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs index 5779d8a156..401a661293 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs @@ -23,7 +23,7 @@ use papyrus_consensus::types::{ ValidatorId, }; use papyrus_network::network_manager::{BroadcastTopicClient, BroadcastTopicClientTrait}; -use papyrus_protobuf::consensus::{ConsensusMessage, Proposal, ProposalInit, Vote}; +use papyrus_protobuf::consensus::{ConsensusMessage, Proposal, ProposalInit, ProposalPart, Vote}; use papyrus_storage::body::BodyStorageReader; use papyrus_storage::header::HeaderStorageReader; use papyrus_storage::{StorageError, StorageReader}; @@ -39,6 +39,7 @@ type HeightToIdToContent = BTreeMap, + _network_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver)>, validators: Vec, sync_broadcast_sender: Option>, // Proposal building/validating returns immediately, leaving the actual processing to a spawned @@ -51,12 +52,14 @@ impl PapyrusConsensusContext { pub fn new( storage_reader: StorageReader, network_broadcast_client: BroadcastTopicClient, + _network_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver)>, num_validators: u64, sync_broadcast_sender: Option>, ) -> Self { Self { storage_reader, network_broadcast_client, + _network_proposal_sender, validators: (0..num_validators).map(ContractAddress::from).collect(), sync_broadcast_sender, valid_proposals: Arc::new(Mutex::new(BTreeMap::new())), @@ -67,10 +70,11 @@ impl PapyrusConsensusContext { #[async_trait] impl ConsensusContext for PapyrusConsensusContext { type ProposalChunk = Transaction; + type ProposalPart = ProposalPart; async fn build_proposal( &mut self, - init: ProposalInit, + proposal_init: ProposalInit, _timeout: Duration, ) -> oneshot::Receiver { let mut network_broadcast_sender = self.network_broadcast_client.clone(); @@ -83,39 +87,39 @@ impl ConsensusContext for PapyrusConsensusContext { // TODO(dvir): consider fix this for the case of reverts. If between the check that // the block in storage and to getting the transaction was a revert // this flow will fail. - wait_for_block(&storage_reader, init.height) + wait_for_block(&storage_reader, proposal_init.height) .await .expect("Failed to wait to block"); let txn = storage_reader.begin_ro_txn().expect("Failed to begin ro txn"); let transactions = txn - .get_block_transactions(init.height) + .get_block_transactions(proposal_init.height) .expect("Get transactions from storage failed") .unwrap_or_else(|| { panic!( "Block in {} was not found in storage despite waiting for it", - init.height + proposal_init.height ) }); let block_hash = txn - .get_block_header(init.height) + .get_block_header(proposal_init.height) .expect("Get header from storage failed") .unwrap_or_else(|| { panic!( "Block in {} was not found in storage despite waiting for it", - init.height + proposal_init.height ) }) .block_hash; let proposal = Proposal { - height: init.height.0, - round: init.round, - proposer: init.proposer, + height: proposal_init.height.0, + round: proposal_init.round, + proposer: proposal_init.proposer, transactions: transactions.clone(), block_hash, - valid_round: init.valid_round, + valid_round: proposal_init.valid_round, }; network_broadcast_sender .broadcast_message(ConsensusMessage::Proposal(proposal)) @@ -125,7 +129,10 @@ impl ConsensusContext for PapyrusConsensusContext { let mut proposals = valid_proposals .lock() .expect("Lock on active proposals was poisoned due to a previous panic"); - proposals.entry(init.height).or_default().insert(block_hash, transactions); + proposals + .entry(proposal_init.height) + .or_default() + .insert(block_hash, transactions); } // Done after inserting the proposal into the map to avoid race conditions between // insertion and calls to `repropose`. diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context_test.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context_test.rs index 9c7e5b9f96..8a5e13b3cb 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context_test.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context_test.rs @@ -2,12 +2,21 @@ use std::time::Duration; use futures::channel::{mpsc, oneshot}; use futures::StreamExt; +use papyrus_consensus::stream_handler::StreamHandler; use papyrus_consensus::types::ConsensusContext; use papyrus_network::network_manager::test_utils::{ mock_register_broadcast_topic, BroadcastNetworkMock, + TestSubscriberChannels, +}; +use papyrus_network::network_manager::BroadcastTopicChannels; +use papyrus_protobuf::consensus::{ + ConsensusMessage, + ProposalInit, + ProposalPart, + StreamMessage, + Vote, }; -use papyrus_protobuf::consensus::{ConsensusMessage, ProposalInit, Vote}; use papyrus_storage::body::BodyStorageWriter; use papyrus_storage::header::HeaderStorageWriter; use papyrus_storage::test_utils::get_test_storage; @@ -107,10 +116,21 @@ fn test_setup() -> ( .unwrap(); let network_channels = mock_register_broadcast_topic().unwrap(); + let network_proposal_channels: TestSubscriberChannels> = + mock_register_broadcast_topic().unwrap(); + let BroadcastTopicChannels { + broadcasted_messages_receiver: inbound_network_receiver, + broadcast_topic_client: outbound_network_sender, + } = network_proposal_channels.subscriber_channels; + let (outbound_internal_sender, _inbound_internal_receiver) = + StreamHandler::get_channels(inbound_network_receiver, outbound_network_sender); + let sync_channels = mock_register_broadcast_topic().unwrap(); + let papyrus_context = PapyrusConsensusContext::new( storage_reader.clone(), network_channels.subscriber_channels.broadcast_topic_client, + outbound_internal_sender, 4, Some(sync_channels.subscriber_channels.broadcast_topic_client), ); diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs index 54e7a8f750..50278b5df3 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs @@ -64,17 +64,20 @@ pub struct SequencerConsensusContext { proposal_id: u64, current_height: Option, network_broadcast_client: BroadcastTopicClient, + _outbound_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver)>, } impl SequencerConsensusContext { pub fn new( batcher: Arc, network_broadcast_client: BroadcastTopicClient, + _outbound_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver)>, num_validators: u64, ) -> Self { Self { batcher, network_broadcast_client, + _outbound_proposal_sender, validators: (0..num_validators).map(ValidatorId::from).collect(), valid_proposals: Arc::new(Mutex::new(HeightToIdToContent::new())), proposal_id: 0, @@ -85,8 +88,9 @@ impl SequencerConsensusContext { #[async_trait] impl ConsensusContext for SequencerConsensusContext { - // TODO: Switch to ProposalPart when Guy merges the PR. + // TODO(guyn): Switch to ProposalPart when done with the streaming integration. type ProposalChunk = Vec; + type ProposalPart = ProposalPart; async fn build_proposal( &mut self, @@ -305,6 +309,7 @@ async fn stream_build_proposal( broadcast_client .broadcast_message(ProposalPart::Transactions(TransactionBatch { transactions, + tx_hashes: transaction_hashes, })) .await .expect("Failed to broadcast proposal content"); diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs index c5b1c127c5..a4a766a50a 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs @@ -5,6 +5,7 @@ use std::vec; use futures::channel::mpsc; use futures::SinkExt; use lazy_static::lazy_static; +use papyrus_consensus::stream_handler::StreamHandler; use papyrus_consensus::types::ConsensusContext; use papyrus_network::network_manager::test_utils::{ mock_register_broadcast_topic, @@ -79,12 +80,27 @@ async fn build_proposal() { }), }) }); + // TODO(guyn): remove this first set of channels once we are using only the streaming channels. let TestSubscriberChannels { mock_network: _mock_network, subscriber_channels } = mock_register_broadcast_topic().expect("Failed to create mock network"); let BroadcastTopicChannels { broadcasted_messages_receiver: _, broadcast_topic_client } = subscriber_channels; - let mut context = - SequencerConsensusContext::new(Arc::new(batcher), broadcast_topic_client, NUM_VALIDATORS); + + let TestSubscriberChannels { mock_network: _mock_network, subscriber_channels } = + mock_register_broadcast_topic().expect("Failed to create mock network"); + let BroadcastTopicChannels { + broadcasted_messages_receiver: inbound_network_receiver, + broadcast_topic_client: outbound_network_sender, + } = subscriber_channels; + let (outbound_internal_sender, _inbound_internal_receiver) = + StreamHandler::get_channels(inbound_network_receiver, outbound_network_sender); + + let mut context = SequencerConsensusContext::new( + Arc::new(batcher), + broadcast_topic_client, + outbound_internal_sender, + NUM_VALIDATORS, + ); let init = ProposalInit { height: BlockNumber(0), round: 0, @@ -132,12 +148,27 @@ async fn validate_proposal_success() { }) }, ); + // TODO(guyn): remove this first set of channels once we are using only the streaming channels. let TestSubscriberChannels { mock_network: _, subscriber_channels } = mock_register_broadcast_topic().expect("Failed to create mock network"); let BroadcastTopicChannels { broadcasted_messages_receiver: _, broadcast_topic_client } = subscriber_channels; - let mut context = - SequencerConsensusContext::new(Arc::new(batcher), broadcast_topic_client, NUM_VALIDATORS); + + let TestSubscriberChannels { mock_network: _mock_network, subscriber_channels } = + mock_register_broadcast_topic().expect("Failed to create mock network"); + let BroadcastTopicChannels { + broadcasted_messages_receiver: inbound_network_receiver, + broadcast_topic_client: outbound_network_sender, + } = subscriber_channels; + let (outbound_internal_sender, _inbound_internal_receiver) = + StreamHandler::get_channels(inbound_network_receiver, outbound_network_sender); + + let mut context = SequencerConsensusContext::new( + Arc::new(batcher), + broadcast_topic_client, + outbound_internal_sender, + NUM_VALIDATORS, + ); let (mut content_sender, content_receiver) = mpsc::channel(CHANNEL_SIZE); content_sender.send(TX_BATCH.clone()).await.unwrap(); let fin_receiver = context.validate_proposal(BlockNumber(0), TIMEOUT, content_receiver).await; @@ -170,12 +201,27 @@ async fn repropose() { }) }, ); + // TODO(guyn): remove this first set of channels once we are using only the streaming channels. let TestSubscriberChannels { mock_network: _, subscriber_channels } = mock_register_broadcast_topic().expect("Failed to create mock network"); let BroadcastTopicChannels { broadcasted_messages_receiver: _, broadcast_topic_client } = subscriber_channels; - let mut context = - SequencerConsensusContext::new(Arc::new(batcher), broadcast_topic_client, NUM_VALIDATORS); + + let TestSubscriberChannels { mock_network: _mock_network, subscriber_channels } = + mock_register_broadcast_topic().expect("Failed to create mock network"); + let BroadcastTopicChannels { + broadcasted_messages_receiver: inbound_network_receiver, + broadcast_topic_client: outbound_network_sender, + } = subscriber_channels; + let (outbound_internal_sender, _inbound_internal_receiver) = + StreamHandler::get_channels(inbound_network_receiver, outbound_network_sender); + + let mut context = SequencerConsensusContext::new( + Arc::new(batcher), + broadcast_topic_client, + outbound_internal_sender, + NUM_VALIDATORS, + ); // Receive a valid proposal. let (mut content_sender, content_receiver) = mpsc::channel(CHANNEL_SIZE); diff --git a/crates/starknet_api/src/test_utils.rs b/crates/starknet_api/src/test_utils.rs index d88a44f87a..dd27509fef 100644 --- a/crates/starknet_api/src/test_utils.rs +++ b/crates/starknet_api/src/test_utils.rs @@ -3,9 +3,12 @@ use std::fs::read_to_string; use std::path::{Path, PathBuf}; use infra_utils::path::cargo_manifest_dir; +use serde::{Deserialize, Serialize}; use starknet_types_core::felt::Felt; -use crate::core::{ContractAddress, Nonce}; +use crate::block::BlockNumber; +use crate::core::{ChainId, ContractAddress, Nonce}; +use crate::transaction::{Transaction, TransactionHash}; pub mod declare; pub mod deploy_account; @@ -27,6 +30,19 @@ pub fn read_json_file>(path_in_resource_dir: P) -> serde_json::Va serde_json::from_str(&json_str).unwrap() } +#[derive(Deserialize, Serialize, Debug)] +/// A struct used for reading the transaction test data (e.g., for transaction hash tests). +pub struct TransactionTestData { + /// The actual transaction. + pub transaction: Transaction, + /// The expected transaction hash. + pub transaction_hash: TransactionHash, + /// An optional transaction hash to query. + pub only_query_transaction_hash: Option, + pub chain_id: ChainId, + pub block_number: BlockNumber, +} + #[derive(Debug, Default)] pub struct NonceManager { next_nonce: HashMap, diff --git a/crates/starknet_api/src/transaction.rs b/crates/starknet_api/src/transaction.rs index 42eeadefad..658ba8b1d1 100644 --- a/crates/starknet_api/src/transaction.rs +++ b/crates/starknet_api/src/transaction.rs @@ -134,6 +134,31 @@ impl From for Transaction { } } +impl From<(Transaction, TransactionHash)> for crate::executable_transaction::Transaction { + fn from(tup: (Transaction, TransactionHash)) -> Self { + let (tx, tx_hash) = tup; + match tx { + Transaction::Declare(_tx) => { + unimplemented!("Declare transactions are not supported yet.") + } + Transaction::Deploy(_tx) => { + unimplemented!("Deploy transactions are not supported yet.") + } + Transaction::DeployAccount(_tx) => { + unimplemented!("DeployAccount transactions are not supported yet.") + } + Transaction::Invoke(tx) => crate::executable_transaction::Transaction::Account( + crate::executable_transaction::AccountTransaction::Invoke( + crate::executable_transaction::InvokeTransaction { tx, tx_hash }, + ), + ), + Transaction::L1Handler(_) => { + unimplemented!("L1Handler transactions are not supported yet.") + } + } + } +} + #[derive(Copy, Clone, Debug, Eq, PartialEq, Default)] pub struct TransactionOptions { /// Transaction that shouldn't be broadcasted to StarkNet. For example, users that want to @@ -775,6 +800,20 @@ impl std::fmt::Display for TransactionHash { } } +// TODO(guyn): this is only used for conversion of transactions->executable transactions +// It should be removed once we integrate a proper way to calculate executable transaction hashes +impl From for Vec { + fn from(tx_hash: TransactionHash) -> Vec { + tx_hash.0.to_bytes_be().to_vec() + } +} +impl From> for TransactionHash { + fn from(bytes: Vec) -> TransactionHash { + let array: [u8; 32] = bytes.try_into().expect("Expected a Vec of length 32"); + TransactionHash(StarkHash::from_bytes_be(&array)) + } +} + /// A transaction version. #[derive( Debug, diff --git a/crates/starknet_api/src/transaction_hash_test.rs b/crates/starknet_api/src/transaction_hash_test.rs index 1abec8c3b0..32877f5be3 100644 --- a/crates/starknet_api/src/transaction_hash_test.rs +++ b/crates/starknet_api/src/transaction_hash_test.rs @@ -1,13 +1,10 @@ use pretty_assertions::assert_eq; -use serde::{Deserialize, Serialize}; use sha3::{Digest, Keccak256}; use starknet_types_core::felt::Felt; use super::{get_transaction_hash, validate_transaction_hash, CONSTRUCTOR_ENTRY_POINT_SELECTOR}; -use crate::block::BlockNumber; -use crate::core::ChainId; -use crate::test_utils::read_json_file; -use crate::transaction::{Transaction, TransactionHash, TransactionOptions}; +use crate::test_utils::{read_json_file, TransactionTestData}; +use crate::transaction::{Transaction, TransactionOptions}; #[test] fn test_constructor_selector() { @@ -19,18 +16,9 @@ fn test_constructor_selector() { assert_eq!(constructor_felt, CONSTRUCTOR_ENTRY_POINT_SELECTOR); } -#[derive(Deserialize, Serialize)] -struct TransactionTestData { - transaction: Transaction, - transaction_hash: TransactionHash, - only_query_transaction_hash: Option, - chain_id: ChainId, - block_number: BlockNumber, -} - #[test] fn test_transaction_hash() { - // The details were taken from Starknet Mainnet. You can found the transactions by hash in: + // The details were taken from Starknet Mainnet. You can find the transactions by hash in: // https://alpha-mainnet.starknet.io/feeder_gateway/get_transaction?transactionHash= let transactions_test_data_vec: Vec = serde_json::from_value(read_json_file("transaction_hash.json")).unwrap(); @@ -64,7 +52,7 @@ fn test_transaction_hash() { #[test] fn test_deprecated_transaction_hash() { - // The details were taken from Starknet Mainnet. You can found the transactions by hash in: + // The details were taken from Starknet Mainnet. You can find the transactions by hash in: // https://alpha-mainnet.starknet.io/feeder_gateway/get_transaction?transactionHash= let transaction_test_data_vec: Vec = serde_json::from_value(read_json_file("deprecated_transaction_hash.json")).unwrap(); diff --git a/crates/starknet_api/src/transaction_test.rs b/crates/starknet_api/src/transaction_test.rs index c69096bf72..3c029603b5 100644 --- a/crates/starknet_api/src/transaction_test.rs +++ b/crates/starknet_api/src/transaction_test.rs @@ -1,5 +1,8 @@ +use super::Transaction; use crate::block::NonzeroGasPrice; +use crate::executable_transaction::Transaction as ExecutableTransaction; use crate::execution_resources::GasAmount; +use crate::test_utils::{read_json_file, TransactionTestData}; use crate::transaction::Fee; #[test] @@ -25,3 +28,28 @@ fn test_fee_div_ceil() { Fee(28).checked_div_ceil(NonzeroGasPrice::try_from(3_u8).unwrap()).unwrap() ); } + +#[test] +fn convert_executable_transaction_and_back() { + // The details were taken from Starknet Mainnet. You can find the transactions by hash in: + // https://alpha-mainnet.starknet.io/feeder_gateway/get_transaction?transactionHash= + let mut transactions_test_data_vec: Vec = + serde_json::from_value(read_json_file("transaction_hash.json")).unwrap(); + + let (tx, tx_hash) = loop { + match transactions_test_data_vec.pop() { + Some(data) => { + if let Transaction::Invoke(tx) = data.transaction { + // Do something with the data + break (Transaction::Invoke(tx), data.transaction_hash); + } + } + None => { + panic!("Could not find a single Invoke transaction in the test data"); + } + } + }; + let executable_tx: ExecutableTransaction = (tx.clone(), tx_hash.clone()).into(); + let tx_back = Transaction::from(executable_tx); + assert_eq!(tx, tx_back); +} diff --git a/crates/starknet_consensus_manager/src/consensus_manager.rs b/crates/starknet_consensus_manager/src/consensus_manager.rs index 1ee091bff2..fb879c57eb 100644 --- a/crates/starknet_consensus_manager/src/consensus_manager.rs +++ b/crates/starknet_consensus_manager/src/consensus_manager.rs @@ -6,12 +6,17 @@ use futures::channel::mpsc::{self, SendError}; use futures::future::Ready; use futures::SinkExt; use libp2p::PeerId; +use papyrus_consensus::stream_handler::StreamHandler; use papyrus_consensus::types::{BroadcastConsensusMessageChannel, ConsensusError}; use papyrus_consensus_orchestrator::sequencer_consensus_context::SequencerConsensusContext; use papyrus_network::gossipsub_impl::Topic; -use papyrus_network::network_manager::{BroadcastTopicClient, NetworkManager}; +use papyrus_network::network_manager::{ + BroadcastTopicChannels, + BroadcastTopicClient, + NetworkManager, +}; use papyrus_network_types::network_types::BroadcastedMessageMetadata; -use papyrus_protobuf::consensus::{ConsensusMessage, ProposalPart}; +use papyrus_protobuf::consensus::{ConsensusMessage, ProposalPart, StreamMessage}; use starknet_batcher_types::communication::SharedBatcherClient; use starknet_sequencer_infra::component_definitions::ComponentStarter; use starknet_sequencer_infra::errors::ComponentError; @@ -22,6 +27,8 @@ use crate::config::ConsensusManagerConfig; // TODO(Dan, Guy): move to config. pub const BROADCAST_BUFFER_SIZE: usize = 100; pub const NETWORK_TOPIC: &str = "consensus_proposals"; +// TODO(guyn): remove this once we have integrated streaming. +pub const NETWORK_TOPIC2: &str = "streamed_consensus_proposals"; #[derive(Clone)] pub struct ConsensusManager { @@ -37,15 +44,33 @@ impl ConsensusManager { pub async fn run(&self) -> Result<(), ConsensusError> { let mut network_manager = NetworkManager::new(self.config.consensus_config.network_config.clone(), None); - let proposals_broadcast_channels = network_manager + + // TODO(guyn): remove this channel once we have integrated streaming. + let old_proposals_broadcast_channels = network_manager .register_broadcast_topic::( Topic::new(NETWORK_TOPIC), BROADCAST_BUFFER_SIZE, ) .expect("Failed to register broadcast topic"); + + let proposals_broadcast_channels = network_manager + .register_broadcast_topic::>( + Topic::new(NETWORK_TOPIC2), + BROADCAST_BUFFER_SIZE, + ) + .expect("Failed to register broadcast topic"); + let BroadcastTopicChannels { + broadcasted_messages_receiver: inbound_network_receiver, + broadcast_topic_client: outbound_network_sender, + } = proposals_broadcast_channels; + + let (outbound_internal_sender, inbound_internal_receiver) = + StreamHandler::get_channels(inbound_network_receiver, outbound_network_sender); + let context = SequencerConsensusContext::new( Arc::clone(&self.batcher_client), - proposals_broadcast_channels.broadcast_topic_client.clone(), + old_proposals_broadcast_channels.broadcast_topic_client.clone(), + outbound_internal_sender, self.config.consensus_config.num_validators, ); @@ -57,6 +82,7 @@ impl ConsensusManager { self.config.consensus_config.consensus_delay, self.config.consensus_config.timeouts.clone(), create_fake_network_channels(), + inbound_internal_receiver, futures::stream::pending(), );