diff --git a/crates/consensus_manager/src/consensus_manager.rs b/crates/consensus_manager/src/consensus_manager.rs index 1ee091bff29..1716865ffe8 100644 --- a/crates/consensus_manager/src/consensus_manager.rs +++ b/crates/consensus_manager/src/consensus_manager.rs @@ -6,12 +6,17 @@ use futures::channel::mpsc::{self, SendError}; use futures::future::Ready; use futures::SinkExt; use libp2p::PeerId; +use papyrus_consensus::stream_handler::StreamHandler; use papyrus_consensus::types::{BroadcastConsensusMessageChannel, ConsensusError}; use papyrus_consensus_orchestrator::sequencer_consensus_context::SequencerConsensusContext; use papyrus_network::gossipsub_impl::Topic; -use papyrus_network::network_manager::{BroadcastTopicClient, NetworkManager}; +use papyrus_network::network_manager::{ + BroadcastTopicChannels, + BroadcastTopicClient, + NetworkManager, +}; use papyrus_network_types::network_types::BroadcastedMessageMetadata; -use papyrus_protobuf::consensus::{ConsensusMessage, ProposalPart}; +use papyrus_protobuf::consensus::{ConsensusMessage, ProposalPart, StreamMessage}; use starknet_batcher_types::communication::SharedBatcherClient; use starknet_sequencer_infra::component_definitions::ComponentStarter; use starknet_sequencer_infra::errors::ComponentError; @@ -38,14 +43,23 @@ impl ConsensusManager { let mut network_manager = NetworkManager::new(self.config.consensus_config.network_config.clone(), None); let proposals_broadcast_channels = network_manager - .register_broadcast_topic::( + .register_broadcast_topic::>( Topic::new(NETWORK_TOPIC), BROADCAST_BUFFER_SIZE, ) .expect("Failed to register broadcast topic"); + let BroadcastTopicChannels { + broadcasted_messages_receiver: inbound_network_receiver, + broadcast_topic_client: outbound_network_sender, + } = proposals_broadcast_channels; + + let (outbound_internal_sender, inbound_internal_receiver) = + StreamHandler::get_channels(inbound_network_receiver, outbound_network_sender); + let context = SequencerConsensusContext::new( Arc::clone(&self.batcher_client), - proposals_broadcast_channels.broadcast_topic_client.clone(), + outbound_internal_sender, + // proposals_broadcast_channels.broadcast_topic_client.clone(), self.config.consensus_config.num_validators, ); @@ -57,6 +71,7 @@ impl ConsensusManager { self.config.consensus_config.consensus_delay, self.config.consensus_config.timeouts.clone(), create_fake_network_channels(), + inbound_internal_receiver, futures::stream::pending(), ); diff --git a/crates/papyrus_node/src/run.rs b/crates/papyrus_node/src/run.rs index d36d045e377..fd864370887 100644 --- a/crates/papyrus_node/src/run.rs +++ b/crates/papyrus_node/src/run.rs @@ -13,10 +13,11 @@ use papyrus_common::pending_classes::PendingClasses; use papyrus_config::presentation::get_config_presentation; use papyrus_config::validators::config_validate; use papyrus_consensus::config::ConsensusConfig; +use papyrus_consensus::stream_handler::StreamHandler; use papyrus_consensus_orchestrator::papyrus_consensus_context::PapyrusConsensusContext; use papyrus_monitoring_gateway::MonitoringServer; use papyrus_network::gossipsub_impl::Topic; -use papyrus_network::network_manager::NetworkManager; +use papyrus_network::network_manager::{BroadcastTopicChannels, NetworkManager}; use papyrus_network::{network_manager, NetworkConfig}; use papyrus_p2p_sync::client::{P2PSyncClient, P2PSyncClientChannels}; use papyrus_p2p_sync::server::{P2PSyncServer, P2PSyncServerChannels}; @@ -49,6 +50,7 @@ const DEFAULT_LEVEL: LevelFilter = LevelFilter::INFO; // different genesis hash. // TODO: Consider moving to a more general place. const GENESIS_HASH: &str = "0x0"; +pub const NETWORK_TOPIC: &str = "consensus_proposals"; // TODO(dvir): add this to config. // Duration between updates to the storage metrics (those in the collect_storage_metrics function). @@ -185,12 +187,25 @@ fn spawn_consensus( let network_channels = network_manager .register_broadcast_topic(Topic::new(config.network_topic.clone()), BUFFER_SIZE)?; + let proposal_network_channels = + network_manager.register_broadcast_topic(Topic::new(NETWORK_TOPIC), BUFFER_SIZE)?; + let BroadcastTopicChannels { + broadcasted_messages_receiver: inbound_network_receiver, + broadcast_topic_client: outbound_network_sender, + } = proposal_network_channels; + let context = PapyrusConsensusContext::new( storage_reader.clone(), - network_channels.broadcast_topic_client.clone(), + // outbound_internal_sender, + // network_channels.broadcast_topic_client.clone(), + outbound_network_sender.clone(), config.num_validators, None, ); + + let (outbound_internal_sender, inbound_internal_receiver) = + StreamHandler::get_channels(inbound_network_receiver, outbound_network_sender); + Ok(tokio::spawn(async move { Ok(papyrus_consensus::run_consensus( context, @@ -199,6 +214,7 @@ fn spawn_consensus( config.consensus_delay, config.timeouts.clone(), network_channels.into(), + inbound_internal_receiver, futures::stream::pending(), ) .await?) diff --git a/crates/papyrus_protobuf/src/consensus.rs b/crates/papyrus_protobuf/src/consensus.rs index d4156ce2705..5d7a40ce906 100644 --- a/crates/papyrus_protobuf/src/consensus.rs +++ b/crates/papyrus_protobuf/src/consensus.rs @@ -1,7 +1,6 @@ -use futures::channel::{mpsc, oneshot}; use starknet_api::block::{BlockHash, BlockNumber}; use starknet_api::core::ContractAddress; -use starknet_api::executable_transaction::Transaction as ExecutableTransaction; +// use starknet_api::executable_transaction::Transaction as ExecutableTransaction; use starknet_api::transaction::Transaction; use crate::converters::ProtobufConversionError; @@ -34,7 +33,7 @@ pub struct Vote { #[derive(Debug, Clone, Hash, Eq, PartialEq)] pub enum ConsensusMessage { - Proposal(Proposal), + Proposal(Proposal), // To be deprecated Vote(Vote), } @@ -99,6 +98,28 @@ pub enum ProposalPart { Fin(ProposalFin), } +impl TryInto for ProposalPart { + type Error = ProtobufConversionError; + + fn try_into(self: ProposalPart) -> Result { + match self { + ProposalPart::Init(init) => Ok(init), + _ => Err(ProtobufConversionError::WrongEnumVariant { + type_description: "ProposalPart", + value_as_str: format!("{:?}", self), + expected: "Init", + got: "Transactions or Fin", + }), + } + } +} + +impl From for ProposalPart { + fn from(value: ProposalInit) -> Self { + ProposalPart::Init(value) + } +} + impl std::fmt::Display for StreamMessage where T: Clone + Into> + TryFrom, Error = ProtobufConversionError>, @@ -125,51 +146,51 @@ where } // TODO(Guy): Remove after implementing broadcast streams. -#[allow(missing_docs)] -pub struct ProposalWrapper(pub Proposal); - -impl From - for (ProposalInit, mpsc::Receiver, oneshot::Receiver) -{ - fn from(val: ProposalWrapper) -> Self { - let transactions: Vec = val.0.transactions.into_iter().collect(); - let proposal_init = ProposalInit { - height: BlockNumber(val.0.height), - round: val.0.round, - proposer: val.0.proposer, - valid_round: val.0.valid_round, - }; - let (mut content_sender, content_receiver) = mpsc::channel(transactions.len()); - for tx in transactions { - content_sender.try_send(tx).expect("Send should succeed"); - } - content_sender.close_channel(); - - let (fin_sender, fin_receiver) = oneshot::channel(); - fin_sender.send(val.0.block_hash).expect("Send should succeed"); - - (proposal_init, content_receiver, fin_receiver) - } -} - -impl From - for (ProposalInit, mpsc::Receiver>, oneshot::Receiver) -{ - fn from(val: ProposalWrapper) -> Self { - let proposal_init = ProposalInit { - height: BlockNumber(val.0.height), - round: val.0.round, - proposer: val.0.proposer, - valid_round: val.0.valid_round, - }; - - let (_, content_receiver) = mpsc::channel(0); - // This should only be used for Milestone 1, and then removed once streaming is supported. - println!("Cannot build ExecutableTransaction from Transaction."); - - let (fin_sender, fin_receiver) = oneshot::channel(); - fin_sender.send(val.0.block_hash).expect("Send should succeed"); - - (proposal_init, content_receiver, fin_receiver) - } -} +// #[allow(missing_docs)] +// pub struct ProposalWrapper(pub Proposal); + +// impl From +// for (ProposalInit, mpsc::Receiver, oneshot::Receiver) +// { +// fn from(val: ProposalWrapper) -> Self { +// let transactions: Vec = val.0.transactions.into_iter().collect(); +// let proposal_init = ProposalInit { +// height: BlockNumber(val.0.height), +// round: val.0.round, +// proposer: val.0.proposer, +// valid_round: val.0.valid_round, +// }; +// let (mut content_sender, content_receiver) = mpsc::channel(transactions.len()); +// for tx in transactions { +// content_sender.try_send(tx).expect("Send should succeed"); +// } +// content_sender.close_channel(); + +// let (fin_sender, fin_receiver) = oneshot::channel(); +// fin_sender.send(val.0.block_hash).expect("Send should succeed"); + +// (proposal_init, content_receiver, fin_receiver) +// } +// } + +// impl From +// for (ProposalInit, mpsc::Receiver>, oneshot::Receiver) +// { +// fn from(val: ProposalWrapper) -> Self { +// let proposal_init = ProposalInit { +// height: BlockNumber(val.0.height), +// round: val.0.round, +// proposer: val.0.proposer, +// valid_round: val.0.valid_round, +// }; + +// let (_, content_receiver) = mpsc::channel(0); +// // This should only be used for Milestone 1, and then removed once streaming is +// supported. println!("Cannot build ExecutableTransaction from Transaction."); + +// let (fin_sender, fin_receiver) = oneshot::channel(); +// fin_sender.send(val.0.block_hash).expect("Send should succeed"); + +// (proposal_init, content_receiver, fin_receiver) +// } +// } diff --git a/crates/papyrus_protobuf/src/converters/mod.rs b/crates/papyrus_protobuf/src/converters/mod.rs index 5fff153657e..caa448ce19f 100644 --- a/crates/papyrus_protobuf/src/converters/mod.rs +++ b/crates/papyrus_protobuf/src/converters/mod.rs @@ -22,6 +22,13 @@ pub enum ProtobufConversionError { MissingField { field_description: &'static str }, #[error("Type `{type_description}` should be {num_expected} bytes but it got {value:?}.")] BytesDataLengthMismatch { type_description: &'static str, num_expected: usize, value: Vec }, + #[error("Type `{type_description}` got unexpected enum variant {value_as_str}")] + WrongEnumVariant { + type_description: &'static str, + value_as_str: String, + expected: &'static str, + got: &'static str, + }, #[error(transparent)] DecodeError(#[from] DecodeError), /// For CompressionError and serde_json::Error we put the string of the error instead of the diff --git a/crates/sequencing/papyrus_consensus/src/manager.rs b/crates/sequencing/papyrus_consensus/src/manager.rs index 5dfa5c7ad52..76bbac98842 100644 --- a/crates/sequencing/papyrus_consensus/src/manager.rs +++ b/crates/sequencing/papyrus_consensus/src/manager.rs @@ -7,13 +7,13 @@ mod manager_test; use std::collections::BTreeMap; use std::time::Duration; -use futures::channel::{mpsc, oneshot}; +use futures::channel::mpsc; use futures::stream::FuturesUnordered; use futures::{Stream, StreamExt}; use papyrus_common::metrics::{PAPYRUS_CONSENSUS_HEIGHT, PAPYRUS_CONSENSUS_SYNC_COUNT}; use papyrus_network::network_manager::BroadcastTopicClientTrait; -use papyrus_protobuf::consensus::{ConsensusMessage, ProposalInit, ProposalWrapper}; -use starknet_api::block::{BlockHash, BlockNumber}; +use papyrus_protobuf::consensus::{ConsensusMessage, ProposalInit}; +use starknet_api::block::BlockNumber; use tracing::{debug, info, instrument}; use crate::config::TimeoutsConfig; @@ -37,13 +37,12 @@ pub async fn run_consensus( consensus_delay: Duration, timeouts: TimeoutsConfig, mut broadcast_channels: BroadcastConsensusMessageChannel, + mut inbound_proposal_receiver: mpsc::Receiver>, mut sync_receiver: SyncReceiverT, ) -> Result<(), ConsensusError> where - ContextT: ConsensusContext, + ContextT: ConsensusContext + 'static, SyncReceiverT: Stream + Unpin, - ProposalWrapper: - Into<(ProposalInit, mpsc::Receiver, oneshot::Receiver)>, { info!( "Running consensus, start_height={}, validator_id={}, consensus_delay={}, timeouts={:?}", @@ -57,11 +56,17 @@ where tokio::time::sleep(consensus_delay).await; let mut current_height = start_height; let mut manager = MultiHeightManager::new(validator_id, timeouts); + #[allow(clippy::as_conversions)] // FIXME: use int metrics so `as f64` may be removed. loop { metrics::gauge!(PAPYRUS_CONSENSUS_HEIGHT, current_height.0 as f64); - let run_height = manager.run_height(&mut context, current_height, &mut broadcast_channels); + let run_height = manager.run_height( + &mut context, + current_height, + &mut broadcast_channels, + &mut inbound_proposal_receiver, + ); // `run_height` is not cancel safe. Our implementation doesn't enable us to start and stop // it. We also cannot restart the height; when we dropped the future we dropped the state it @@ -101,20 +106,13 @@ impl MultiHeightManager { /// Assumes that `height` is monotonically increasing across calls for the sake of filtering /// `cached_messaged`. #[instrument(skip(self, context, broadcast_channels), level = "info")] - pub async fn run_height( + pub async fn run_height( &mut self, context: &mut ContextT, height: BlockNumber, broadcast_channels: &mut BroadcastConsensusMessageChannel, - ) -> Result - where - ContextT: ConsensusContext, - ProposalWrapper: Into<( - ProposalInit, - mpsc::Receiver, - oneshot::Receiver, - )>, - { + proposal_receiver: &mut mpsc::Receiver>, + ) -> Result { let validators = context.validators(height).await; info!("running consensus for height {height:?} with validator set {validators:?}"); let mut shc = SingleHeightConsensus::new( @@ -140,6 +138,9 @@ impl MultiHeightManager { message = next_message(&mut current_height_messages, broadcast_channels) => { self.handle_message(context, height, &mut shc, message?).await? }, + Some(content_receiver) = proposal_receiver.next() => { + self.handle_proposal(context, height, &mut shc, content_receiver).await? + }, Some(shc_event) = shc_events.next() => { shc.handle_event(context, shc_event).await? }, @@ -156,22 +157,41 @@ impl MultiHeightManager { } } + // Handle a new proposal receiver from the network. + async fn handle_proposal( + &mut self, + context: &mut ContextT, + height: BlockNumber, + shc: &mut SingleHeightConsensus, + // proposal_init: (BlockNumber, u32, ContractAddress, Option), + mut content_receiver: mpsc::Receiver, + ) -> Result { + let Some(first_part) = content_receiver.next().await else { + return Err(ConsensusError::InternalNetworkError( + "Proposal receiver closed".to_string(), + )); + }; + let proposal_init: ProposalInit = first_part.into().try_into()?; + + // TODO(guyn): what is the right thing to do if proposal's height doesn't match? + if proposal_init.height != height { + debug!("Received a proposal for a different height. {:?}", proposal_init); + // if message.height() > height.0 { + // self.cached_messages.entry(message.height()).or_default().push(message); + // } + return Ok(ShcReturn::Tasks(Vec::new())); + } + shc.handle_proposal(context, proposal_init.into(), content_receiver).await + } + // Handle a single consensus message. - async fn handle_message( + async fn handle_message( &mut self, context: &mut ContextT, height: BlockNumber, shc: &mut SingleHeightConsensus, message: ConsensusMessage, - ) -> Result - where - ContextT: ConsensusContext, - ProposalWrapper: Into<( - ProposalInit, - mpsc::Receiver, - oneshot::Receiver, - )>, - { + ) -> Result { // TODO(matan): We need to figure out an actual cacheing strategy under 2 constraints: // 1. Malicious - must be capped so a malicious peer can't DoS us. // 2. Parallel proposals - we may send/receive a proposal for (H+1, 0). @@ -184,14 +204,16 @@ impl MultiHeightManager { return Ok(ShcReturn::Tasks(Vec::new())); } match message { - ConsensusMessage::Proposal(proposal) => { - // Special case due to fake streaming. - let (proposal_init, content_receiver, fin_receiver) = - ProposalWrapper(proposal).into(); - let res = shc - .handle_proposal(context, proposal_init, content_receiver, fin_receiver) - .await?; - Ok(res) + ConsensusMessage::Proposal(_proposal) => { + // Special case due to fake streaming. TODO(guyn): We can eliminate this option and + // leave handle_message. let (proposal_init, content_receiver, + // fin_receiver) = ProposalWrapper(proposal).into(); + // let res = shc + // .handle_proposal(context, proposal_init.into(), content_receiver, + // fin_receiver) .await?; + Err(ConsensusError::InternalNetworkError( + "Proposal variant of ConsensusMessage no longer supported".to_string(), + )) } _ => { let res = shc.handle_message(context, message).await?; diff --git a/crates/sequencing/papyrus_consensus/src/manager_test.rs b/crates/sequencing/papyrus_consensus/src/manager_test.rs index 3f94d78fb42..a0e2aa1e4c5 100644 --- a/crates/sequencing/papyrus_consensus/src/manager_test.rs +++ b/crates/sequencing/papyrus_consensus/src/manager_test.rs @@ -12,8 +12,15 @@ use papyrus_network::network_manager::test_utils::{ MockBroadcastedMessagesSender, TestSubscriberChannels, }; +use papyrus_network::network_manager::BroadcastTopicClient; use papyrus_network_types::network_types::BroadcastedMessageMetadata; -use papyrus_protobuf::consensus::{ConsensusMessage, ProposalInit, Vote}; +use papyrus_protobuf::consensus::{ + ConsensusMessage, + ProposalInit, + ProposalPart, + StreamMessage, + Vote, +}; use papyrus_test_utils::{get_rng, GetTestInstance}; use starknet_api::block::{BlockHash, BlockNumber}; use starknet_api::transaction::Transaction; @@ -37,7 +44,7 @@ mock! { #[async_trait] impl ConsensusContext for TestContext { - type ProposalChunk = Transaction; + type ProposalPart = ProposalPart; async fn build_proposal( &mut self, @@ -49,7 +56,7 @@ mock! { &mut self, height: BlockNumber, timeout: Duration, - content: mpsc::Receiver + content: mpsc::Receiver ) -> oneshot::Receiver; async fn repropose( @@ -70,6 +77,7 @@ mock! { precommits: Vec, ) -> Result<(), ConsensusError>; } + } async fn send(sender: &mut MockBroadcastedMessagesSender, msg: ConsensusMessage) { diff --git a/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs b/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs index f2ee23c7d34..89c93197124 100644 --- a/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs +++ b/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs @@ -66,11 +66,7 @@ pub enum ShcTask { /// result without blocking consensus. /// 3. Once validation is complete, the manager returns the built proposal to the SHC as an /// event, which can be sent to the SM. - ValidateProposal( - ProposalInit, - oneshot::Receiver, // Block built from the content. - oneshot::Receiver, // Fin sent by the proposer. - ), + ValidateProposal(ProposalInit, oneshot::Receiver<(ProposalContentId, ProposalContentId)>), } impl PartialEq for ShcTask { @@ -82,9 +78,7 @@ impl PartialEq for ShcTask { | (ShcTask::Prevote(d1, e1), ShcTask::Prevote(d2, e2)) | (ShcTask::Precommit(d1, e1), ShcTask::Precommit(d2, e2)) => d1 == d2 && e1 == e2, (ShcTask::BuildProposal(r1, _), ShcTask::BuildProposal(r2, _)) => r1 == r2, - (ShcTask::ValidateProposal(pi1, _, _), ShcTask::ValidateProposal(pi2, _, _)) => { - pi1 == pi2 - } + (ShcTask::ValidateProposal(pi1, _), ShcTask::ValidateProposal(pi2, _)) => pi1 == pi2, _ => false, } } @@ -118,24 +112,18 @@ impl ShcTask { let proposal_id = receiver.await.expect("Block building failed."); ShcEvent::BuildProposal(StateMachineEvent::GetProposal(Some(proposal_id), round)) } - ShcTask::ValidateProposal( - init, - id_built_from_content_receiver, - fin_from_proposer_receiver, - ) => { - let proposal_id = match id_built_from_content_receiver.await { - Ok(proposal_id) => Some(proposal_id), + ShcTask::ValidateProposal(init, block_receiver) => { + let (block_proposal_id, network_proposal_id) = match block_receiver.await { + Ok((block_proposal_id, network_proposal_id)) => { + (Some(block_proposal_id), Some(network_proposal_id)) + } // Proposal never received from peer. - Err(_) => None, - }; - let fin = match fin_from_proposer_receiver.await { - Ok(fin) => Some(fin), - // ProposalFin never received from peer. - Err(_) => None, + Err(_) => (None, None), }; + ShcEvent::ValidateProposal( - StateMachineEvent::Proposal(proposal_id, init.round, init.valid_round), - fin, + StateMachineEvent::Proposal(block_proposal_id, init.round, init.valid_round), + network_proposal_id, ) } } @@ -206,8 +194,7 @@ impl SingleHeightConsensus { &mut self, context: &mut ContextT, init: ProposalInit, - p2p_messages_receiver: mpsc::Receiver, - fin_receiver: oneshot::Receiver, + p2p_messages_receiver: mpsc::Receiver, ) -> Result { debug!( "Received proposal: height={}, round={}, proposer={:?}", @@ -230,10 +217,10 @@ impl SingleHeightConsensus { // Since validating the proposal is non-blocking, we want to avoid validating the same round // twice in parallel. This could be caused by a network repeat or a malicious spam attack. proposal_entry.insert(None); - let block_receiver = context + let validation_receiver = context .validate_proposal(self.height, self.timeouts.proposal_timeout, p2p_messages_receiver) .await; - Ok(ShcReturn::Tasks(vec![ShcTask::ValidateProposal(init, block_receiver, fin_receiver)])) + Ok(ShcReturn::Tasks(vec![ShcTask::ValidateProposal(init, validation_receiver)])) } async fn process_inbound_proposal( @@ -304,19 +291,19 @@ impl SingleHeightConsensus { )])) } ShcEvent::ValidateProposal( - StateMachineEvent::Proposal(proposal_id, round, valid_round), - fin, + StateMachineEvent::Proposal(block_proposal_id, round, valid_round), + network_proposal_id, ) => { // TODO(matan): Switch to signature validation. - let id = if proposal_id != fin { + let id = if block_proposal_id != network_proposal_id { warn!( "proposal_id built from content receiver does not match fin: {:#064x?} != \ {:#064x?}", - proposal_id, fin + block_proposal_id, network_proposal_id ); None } else { - proposal_id + block_proposal_id }; // Retaining the entry for this round prevents us from receiving another proposal on // this round. If the validations failed, which can be caused by a network issue, we diff --git a/crates/sequencing/papyrus_consensus/src/stream_handler.rs b/crates/sequencing/papyrus_consensus/src/stream_handler.rs index 0531ba48a6e..ed57d2781ea 100644 --- a/crates/sequencing/papyrus_consensus/src/stream_handler.rs +++ b/crates/sequencing/papyrus_consensus/src/stream_handler.rs @@ -30,7 +30,9 @@ type StreamKey = (PeerId, StreamId); const CHANNEL_BUFFER_LENGTH: usize = 100; #[derive(Debug, Clone)] -struct StreamData> + TryFrom, Error = ProtobufConversionError>> { +struct StreamData< + T: Clone + Into> + TryFrom, Error = ProtobufConversionError> + 'static, +> { next_message_id: MessageId, // Last message ID. If None, it means we have not yet gotten to it. fin_message_id: Option, @@ -56,7 +58,7 @@ impl> + TryFrom, Error = ProtobufConversionError /// - Buffering inbound messages and reporting them to the application in order. /// - Sending outbound messages to the network, wrapped in StreamMessage. pub struct StreamHandler< - T: Clone + Into> + TryFrom, Error = ProtobufConversionError>, + T: Clone + Into> + TryFrom, Error = ProtobufConversionError> + 'static, > { // For each stream ID from the network, send the application a Receiver // that will receive the messages in order. This allows sending such Receivers. @@ -100,6 +102,41 @@ impl> + TryFrom, Error = ProtobufConversi } } + /// Create a new StreamHandler and start it running in a new task. + /// Gets network input/output channels and returns application input/output channels. + pub fn get_channels( + inbound_network_receiver: BroadcastTopicServer>, + outbound_network_sender: BroadcastTopicClient>, + ) -> (mpsc::Sender<(StreamId, mpsc::Receiver)>, mpsc::Receiver>) { + // The inbound messages come into StreamHandler via inbound_network_receiver, + // and are forwarded to the consensus via inbound_internal_receiver + // (the StreamHandler keeps the inbound_internal_sender to pass messsage). + let (inbound_internal_sender, inbound_internal_receiver): ( + mpsc::Sender>, + mpsc::Receiver>, + ) = mpsc::channel(CHANNEL_BUFFER_LENGTH); + // The outbound messages that an application would like to send are: + // 1. Sent into outbound_internal_sender as tuples of (StreamId, Receiver) + // 2. Ingested by StreamHandler by its outbound_internal_receiver. + // 3. Boadcast by the StreamHandler using its outbound_network_sender. + let (outbound_internal_sender, outbound_internal_receiver): ( + mpsc::Sender<(StreamId, mpsc::Receiver)>, + mpsc::Receiver<(StreamId, mpsc::Receiver)>, + ) = mpsc::channel(CHANNEL_BUFFER_LENGTH); + + let mut stream_handler = StreamHandler::::new( + inbound_internal_sender, // Sender>, + inbound_network_receiver, // BroadcastTopicServer>, + outbound_internal_receiver, // Receiver<(StreamId, Receiver)>, + outbound_network_sender, // BroadcastTopicClient> + ); + tokio::spawn(async move { + stream_handler.run().await; + }); + + (outbound_internal_sender, inbound_internal_receiver) + } + /// Listen for messages coming from the network and from the application. /// - Outbound messages are wrapped as StreamMessage and sent to the network directly. /// - Inbound messages are stripped of StreamMessage and buffered until they can be sent in the diff --git a/crates/sequencing/papyrus_consensus/src/stream_handler_test.rs b/crates/sequencing/papyrus_consensus/src/stream_handler_test.rs index 0bd7250f122..4bd575da8df 100644 --- a/crates/sequencing/papyrus_consensus/src/stream_handler_test.rs +++ b/crates/sequencing/papyrus_consensus/src/stream_handler_test.rs @@ -83,7 +83,7 @@ mod tests { mock_register_broadcast_topic().unwrap(); let network_sender_to_inbound = mock_network.broadcasted_messages_sender; - // The inbound_receiver is given to StreamHandler to inbound to mock network messages. + // The inbound_receiver is given to StreamHandler to mock network messages. let BroadcastTopicChannels { broadcasted_messages_receiver: inbound_receiver, broadcast_topic_client: _, diff --git a/crates/sequencing/papyrus_consensus/src/types.rs b/crates/sequencing/papyrus_consensus/src/types.rs index 6e31c011c64..40c4bcd2ec9 100644 --- a/crates/sequencing/papyrus_consensus/src/types.rs +++ b/crates/sequencing/papyrus_consensus/src/types.rs @@ -29,7 +29,12 @@ pub trait ConsensusContext { /// The chunks of content returned when iterating the proposal. // In practice I expect this to match the type sent to the network // (papyrus_protobuf::ConsensusMessage), and not to be specific to just the block's content. - type ProposalChunk; + type ProposalPart: TryFrom, Error = ProtobufConversionError> + + Into> + + TryInto + + From + + Clone + + Send; // TODO(matan): The oneshot for receiving the build block could be generalized to just be some // future which returns a block. @@ -63,14 +68,15 @@ pub trait ConsensusContext { /// - `content`: A receiver for the stream of the block's content. /// /// Returns: - /// - A receiver for the block id. If a valid block cannot be built the Sender will be dropped - /// by ConsensusContext. + /// - A receiver for a tuple with two block ids, one calculated by the context, one sent from + /// the network. If a valid block cannot be built the Sender will be dropped by + /// ConsensusContext. async fn validate_proposal( &mut self, height: BlockNumber, timeout: Duration, - content: mpsc::Receiver, - ) -> oneshot::Receiver; + content: mpsc::Receiver, + ) -> oneshot::Receiver<(ProposalContentId, ProposalContentId)>; /// This function is called by consensus to retrieve the content of a previously built or /// validated proposal. It broadcasts the proposal to the network. diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs index 5779d8a1560..378fb6bd42b 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs @@ -23,7 +23,7 @@ use papyrus_consensus::types::{ ValidatorId, }; use papyrus_network::network_manager::{BroadcastTopicClient, BroadcastTopicClientTrait}; -use papyrus_protobuf::consensus::{ConsensusMessage, Proposal, ProposalInit, Vote}; +use papyrus_protobuf::consensus::{ConsensusMessage, Proposal, ProposalInit, ProposalPart, Vote}; use papyrus_storage::body::BodyStorageReader; use papyrus_storage::header::HeaderStorageReader; use papyrus_storage::{StorageError, StorageReader}; @@ -51,12 +51,14 @@ impl PapyrusConsensusContext { pub fn new( storage_reader: StorageReader, network_broadcast_client: BroadcastTopicClient, + // network_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver)>, num_validators: u64, sync_broadcast_sender: Option>, ) -> Self { Self { storage_reader, network_broadcast_client, + // network_proposal_sender, validators: (0..num_validators).map(ContractAddress::from).collect(), sync_broadcast_sender, valid_proposals: Arc::new(Mutex::new(BTreeMap::new())), @@ -66,7 +68,7 @@ impl PapyrusConsensusContext { #[async_trait] impl ConsensusContext for PapyrusConsensusContext { - type ProposalChunk = Transaction; + type ProposalPart = ProposalPart; async fn build_proposal( &mut self, @@ -141,8 +143,8 @@ impl ConsensusContext for PapyrusConsensusContext { &mut self, height: BlockNumber, _timeout: Duration, - mut content: mpsc::Receiver, - ) -> oneshot::Receiver { + mut content: mpsc::Receiver, + ) -> oneshot::Receiver<(ProposalContentId, ProposalContentId)> { let (fin_sender, fin_receiver) = oneshot::channel(); let storage_reader = self.storage_reader.clone(); @@ -162,20 +164,38 @@ impl ConsensusContext for PapyrusConsensusContext { panic!("Block in {height} was not found in storage despite waiting for it") }); + // First gather all the non-fin transactions. + let mut content_transactions: Vec = Vec::new(); + while let Some(ProposalPart::Transactions(batch)) = content.next().await { + for tx in batch.transactions { + content_transactions.push(tx); + } + } + + // Check that last transaction is a fin. + let received_block_hash = match content.next().await { + Some(message) => { + if let ProposalPart::Fin(received_fin) = message { + received_fin.proposal_content_id + } else { + panic!("Last message in the stream should be a fin"); + } + } + None => { + panic!("Did not receive a Fin message"); + } + }; + + // Check each transaction matches the transactions in the storage. for tx in transactions.iter() { - let received_tx = content - .next() - .await - .unwrap_or_else(|| panic!("Not received transaction equals to {tx:?}")); + let received_tx = content_transactions + .pop() + .expect("Received less transactions than expected"); if tx != &received_tx { panic!("Transactions are not equal. In storage: {tx:?}, : {received_tx:?}"); } } - if content.next().await.is_some() { - panic!("Received more transactions than expected"); - } - let block_hash = txn .get_block_header(height) .expect("Get header from storage failed") @@ -192,7 +212,7 @@ impl ConsensusContext for PapyrusConsensusContext { // Done after inserting the proposal into the map to avoid race conditions between // insertion and calls to `repropose`. // This can happen as a result of sync interrupting `run_height`. - fin_sender.send(block_hash).unwrap_or_else(|_| { + fin_sender.send((block_hash, received_block_hash)).unwrap_or_else(|_| { warn!("Failed to send block to consensus. height={height}"); }) } diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs index 40e23fe2042..73fc0f43b45 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs @@ -11,7 +11,7 @@ use std::time::Duration; use async_trait::async_trait; use futures::channel::{mpsc, oneshot}; -use futures::StreamExt; +use futures::{SinkExt, StreamExt}; use papyrus_consensus::types::{ ConsensusContext, ConsensusError, @@ -19,9 +19,10 @@ use papyrus_consensus::types::{ Round, ValidatorId, }; -use papyrus_network::network_manager::{BroadcastTopicClient, BroadcastTopicClientTrait}; +// use papyrus_network::network_manager::{BroadcastTopicClient, BroadcastTopicClientTrait}; use papyrus_protobuf::consensus::{ ConsensusMessage, + ProposalFin, ProposalInit, ProposalPart, TransactionBatch, @@ -62,18 +63,20 @@ pub struct SequencerConsensusContext { // restarting. proposal_id: u64, current_height: Option, - network_broadcast_client: BroadcastTopicClient, + // network_broadcast_client: BroadcastTopicClient, + outbound_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver)>, } impl SequencerConsensusContext { pub fn new( batcher: Arc, - network_broadcast_client: BroadcastTopicClient, + // network_broadcast_client: BroadcastTopicClient, + outbound_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver)>, num_validators: u64, ) -> Self { Self { batcher, - network_broadcast_client, + outbound_proposal_sender, validators: (0..num_validators).map(ValidatorId::from).collect(), valid_proposals: Arc::new(Mutex::new(HeightToIdToContent::new())), proposal_id: 0, @@ -82,10 +85,11 @@ impl SequencerConsensusContext { } } +const CHANNEL_SIZE: usize = 100; + #[async_trait] impl ConsensusContext for SequencerConsensusContext { - // TODO: Switch to ProposalPart when Guy merges the PR. - type ProposalChunk = Vec; + type ProposalPart = ProposalPart; async fn build_proposal( &mut self, @@ -125,11 +129,22 @@ impl ConsensusContext for SequencerConsensusContext { .await .expect("Failed to initiate proposal build"); debug!("Broadcasting proposal init: {proposal_init:?}"); - self.network_broadcast_client - .broadcast_message(ProposalPart::Init(proposal_init.clone())) + let (mut proposal_sender, proposal_receiver) = mpsc::channel(CHANNEL_SIZE); + let stream_id = proposal_init.height.0; + self.outbound_proposal_sender + .send((stream_id, proposal_receiver)) + .await + .expect("Failed to send proposal receiver"); + proposal_sender + .send(Self::ProposalPart::Init(proposal_init.clone())) .await - .expect("Failed to broadcast proposal init"); - let broadcast_client = self.network_broadcast_client.clone(); + .expect("Failed to send proposal init"); + // self.network_broadcast_client + // .broadcast_message(ProposalPart::Init(proposal_init.clone())) + // .await + // .expect("Failed to broadcast proposal init"); + // let broadcast_client = self.network_broadcast_client.clone(); + tokio::spawn( async move { stream_build_proposal( @@ -137,12 +152,11 @@ impl ConsensusContext for SequencerConsensusContext { proposal_id, batcher, valid_proposals, - broadcast_client, + proposal_sender, fin_sender, ) .await; - } - .instrument(debug_span!("consensus_build_proposal")), + }, // .instrument(debug_span!("consensus_build_proposal")), ); fin_receiver @@ -152,8 +166,8 @@ impl ConsensusContext for SequencerConsensusContext { &mut self, height: BlockNumber, timeout: Duration, - content: mpsc::Receiver, - ) -> oneshot::Receiver { + content_receiver: mpsc::Receiver, + ) -> oneshot::Receiver<(ProposalContentId, ProposalContentId)> { debug!("Validating proposal for height: {height} with timeout: {timeout:?}"); let (fin_sender, fin_receiver) = oneshot::channel(); let batcher = Arc::clone(&self.batcher); @@ -181,7 +195,7 @@ impl ConsensusContext for SequencerConsensusContext { proposal_id, batcher, valid_proposals, - content, + content_receiver, fin_sender, ); if let Err(e) = tokio::time::timeout(timeout, validate_fut).await { @@ -205,7 +219,7 @@ impl ConsensusContext for SequencerConsensusContext { .unwrap_or_else(|| panic!("No proposals found for height {height}")) .get(&id) .unwrap_or_else(|| panic!("No proposal found for height {height} and id {id}")); - // TODO: Stream the TXs to the network. + // TODO(guyn): Stream the TXs to the network. } async fn validators(&self, _height: BlockNumber) -> Vec { @@ -265,16 +279,16 @@ impl SequencerConsensusContext { // Handles building a new proposal without blocking consensus: // 1. Receive chunks of content from the batcher. -// 2. Forward these to consensus to be streamed out to the network. +// 2. Forward these to stream handler to be streamed out to the network. // 3. Once finished, receive the commitment from the batcher. // 4. Store the proposal for re-proposal. -// 5. Send the commitment to consensus. +// 5. Send the commitment to the stream handler (to send Fin). async fn stream_build_proposal( height: BlockNumber, proposal_id: ProposalId, batcher: Arc, valid_proposals: Arc>, - mut broadcast_client: BroadcastTopicClient, + mut proposal_sender: mpsc::Sender, fin_sender: oneshot::Sender, ) { let mut content = Vec::new(); @@ -301,12 +315,16 @@ async fn stream_build_proposal( } debug!("Broadcasting proposal content: {transaction_hashes:?}"); trace!("Broadcasting proposal content: {transactions:?}"); - broadcast_client - .broadcast_message(ProposalPart::Transactions(TransactionBatch { - transactions, - })) + proposal_sender + .send(ProposalPart::Transactions(TransactionBatch { transactions })) .await .expect("Failed to broadcast proposal content"); + // broadcast_client + // .broadcast_message(ProposalPart::Transactions(TransactionBatch { + // transactions, + // })) + // .await + // .expect("Failed to broadcast proposal content"); } GetProposalContent::Finished(id) => { let proposal_content_id = BlockHash(id.state_diff_commitment.0.0); @@ -325,6 +343,10 @@ async fn stream_build_proposal( .entry(height) .or_default() .insert(proposal_content_id, (content, proposal_id)); + // proposal_sender + // .send(ProposalPart::Fin(ProposalFin { proposal_content_id })) + // .await + // .expect("Failed to broadcast proposal fin"); if fin_sender.send(proposal_content_id).is_err() { // Consensus may exit early (e.g. sync). warn!("Failed to send proposal content id"); @@ -346,36 +368,53 @@ async fn stream_validate_proposal( proposal_id: ProposalId, batcher: Arc, valid_proposals: Arc>, - mut content_receiver: mpsc::Receiver>, - fin_sender: oneshot::Sender, + mut content_receiver: mpsc::Receiver, + fin_sender: oneshot::Sender<(ProposalContentId, ProposalContentId)>, ) { let mut content = Vec::new(); - while let Some(txs) = content_receiver.next().await { - content.extend_from_slice(&txs[..]); - let input = - SendProposalContentInput { proposal_id, content: SendProposalContent::Txs(txs) }; - let response = batcher.send_proposal_content(input).await.unwrap_or_else(|e| { - panic!("Failed to send proposal content to batcher: {proposal_id:?}. {e:?}") - }); - match response.response { - ProposalStatus::Processing => {} - ProposalStatus::Finished(fin) => { - panic!("Batcher returned Fin before all content was sent: {proposal_id:?} {fin:?}"); + let mut network_block_id = BlockHash::default(); + while let Some(prop_part) = content_receiver.next().await { + match prop_part { + ProposalPart::Transactions(TransactionBatch { transactions: txs }) => { + content.extend_from_slice(&txs[..]); + let input = SendProposalContentInput { + proposal_id, + content: SendProposalContent::Txs( + txs.into_iter().map(Transaction::from).collect(), + ), + }; + let response = batcher.send_proposal_content(input).await.unwrap_or_else(|e| { + panic!("Failed to send proposal content to batcher: {proposal_id:?}. {e:?}") + }); + match response.response { + ProposalStatus::Processing => {} + ProposalStatus::Finished(fin) => { + panic!( + "Batcher returned Fin before all content was sent: {proposal_id:?} \ + {fin:?}" + ); + } + ProposalStatus::InvalidProposal => { + warn!("Proposal was invalid: {:?}", proposal_id); + return; + } + } } - ProposalStatus::InvalidProposal => { - warn!("Proposal was invalid: {:?}", proposal_id); - return; + ProposalPart::Fin(ProposalFin { proposal_content_id: id }) => { + network_block_id = id; // Output this along with the ID from batcher, to compare them. + break; + } + _ => { + panic!("Invalid proposal part: {:?}", prop_part); } } } - // TODO: In the future we will receive a Fin from the network instead of the channel closing. - // We will just send the network Fin out along with what the batcher calculates. let input = SendProposalContentInput { proposal_id, content: SendProposalContent::Finish }; let response = batcher .send_proposal_content(input) .await .unwrap_or_else(|e| panic!("Failed to send Fin to batcher: {proposal_id:?}. {e:?}")); - let id = match response.response { + let response_id = match response.response { ProposalStatus::Finished(id) => id, ProposalStatus::Processing => { panic!("Batcher failed to return Fin after all content was sent: {:?}", proposal_id); @@ -385,20 +424,26 @@ async fn stream_validate_proposal( return; } }; - let proposal_content_id = BlockHash(id.state_diff_commitment.0.0); + let batcher_block_id = BlockHash(response_id.state_diff_commitment.0.0); + info!( - "Finished validating proposal {:?}: content_id = {:?}, num_txs = {:?}, height = {:?}", + "Finished validating proposal {:?}: network_block_id: {:?}, batcher_block_id = {:?}, \ + num_txs = {:?}, height = {:?}", proposal_id, - proposal_content_id, + network_block_id, + batcher_block_id, content.len(), height ); // Update valid_proposals before sending fin to avoid a race condition // with `get_proposal` being called before `valid_proposals` is updated. let mut valid_proposals = valid_proposals.lock().unwrap(); - valid_proposals.entry(height).or_default().insert(proposal_content_id, (content, proposal_id)); - if fin_sender.send(proposal_content_id).is_err() { + valid_proposals.entry(height).or_default().insert( + batcher_block_id, + (content.into_iter().map(Transaction::from).collect(), proposal_id), + ); + if fin_sender.send((batcher_block_id, network_block_id)).is_err() { // Consensus may exit early (e.g. sync). - warn!("Failed to send proposal content id"); + warn!("Failed to send proposal content ids"); } } diff --git a/crates/starknet_api/src/transaction.rs b/crates/starknet_api/src/transaction.rs index 56de9bfe3e7..a5eb838583a 100644 --- a/crates/starknet_api/src/transaction.rs +++ b/crates/starknet_api/src/transaction.rs @@ -126,6 +126,31 @@ impl From for Transaction { } } +impl From for crate::executable_transaction::Transaction { + fn from(tx: Transaction) -> Self { + match tx { + Transaction::L1Handler(_) => { + unimplemented!("L1Handler transactions are not supported yet.") + } + Transaction::Declare(_) => { + unimplemented!( + "Cannot convert Declare from executable (CASM) to external (SIERRA) \ + transactions." + ) + } + Transaction::Deploy(_tx) => { + unimplemented!("Need to figure out how to do this.") + } + Transaction::DeployAccount(_tx) => { + unimplemented!("Need to figure out how to do this.") + } + Transaction::Invoke(_tx) => { + unimplemented!("Need to figure out how to do this.") + } + } + } +} + #[derive(Copy, Clone, Debug, Eq, PartialEq, Default)] pub struct TransactionOptions { /// Transaction that shouldn't be broadcasted to StarkNet. For example, users that want to