diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs index 89945cec95..1637c19322 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs @@ -23,7 +23,15 @@ use papyrus_consensus::types::{ ValidatorId, }; use papyrus_network::network_manager::{BroadcastTopicClient, BroadcastTopicClientTrait}; -use papyrus_protobuf::consensus::{ConsensusMessage, Proposal, ProposalFin, ProposalInit, ProposalPart, TransactionBatch, Vote}; +use papyrus_protobuf::consensus::{ + ConsensusMessage, + Proposal, + ProposalFin, + ProposalInit, + ProposalPart, + TransactionBatch, + Vote, +}; use papyrus_storage::body::BodyStorageReader; use papyrus_storage::header::HeaderStorageReader; use papyrus_storage::{StorageError, StorageReader}; diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs index 83ce87067f..74d2ad7b19 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs @@ -343,10 +343,6 @@ async fn stream_build_proposal( .entry(height) .or_default() .insert(proposal_content_id, (content, proposal_id)); - // proposal_sender - // .send(ProposalPart::Fin(ProposalFin { proposal_content_id })) - // .await - // .expect("Failed to broadcast proposal fin"); if fin_sender.send(proposal_content_id).is_err() { // Consensus may exit early (e.g. sync). warn!("Failed to send proposal content id"); diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs index aced9b6ef6..1b2ef6c171 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs @@ -213,11 +213,7 @@ async fn repropose() { broadcasted_messages_receiver: inbound_network_receiver, broadcast_topic_client: outbound_network_sender, } = subscriber_channels; -<<<<<<< HEAD let (outbound_internal_sender, _inbound_internal_receiver, _) = -======= - let (outbound_internal_sender, _inbound_internal_receiver) = ->>>>>>> 883a253be (feat: allow a streamed proposal channel on top of existing one) StreamHandler::get_channels(inbound_network_receiver, outbound_network_sender); let mut context = SequencerConsensusContext::new( diff --git a/crates/starknet_api/src/transaction.rs b/crates/starknet_api/src/transaction.rs index f2eea93dad..fede15d67e 100644 --- a/crates/starknet_api/src/transaction.rs +++ b/crates/starknet_api/src/transaction.rs @@ -147,7 +147,7 @@ impl From<(Transaction, TransactionHash)> for crate::executable_transaction::Tra ), _ => { unimplemented!("Unsupported transaction type. Only Invoke is currently supported.") - }, + } } } } diff --git a/crates/starknet_integration_tests/src/flow_test_setup.rs b/crates/starknet_integration_tests/src/flow_test_setup.rs index cecefd60cd..d2cc04d95a 100644 --- a/crates/starknet_integration_tests/src/flow_test_setup.rs +++ b/crates/starknet_integration_tests/src/flow_test_setup.rs @@ -2,7 +2,7 @@ use std::net::SocketAddr; use mempool_test_utils::starknet_api_test_utils::MultiAccountTransactionGenerator; use papyrus_network::network_manager::BroadcastTopicChannels; -use papyrus_protobuf::consensus::ProposalPart; +use papyrus_protobuf::consensus::{ProposalPart, StreamMessage}; use starknet_api::rpc_transaction::RpcTransaction; use starknet_api::transaction::TransactionHash; use starknet_gateway_types::errors::GatewaySpecError; @@ -32,7 +32,7 @@ pub struct FlowTestSetup { pub sequencer_node_handle: JoinHandle>, // Channels for consensus proposals, used for asserting the right transactions are proposed. - pub consensus_proposals_channels: BroadcastTopicChannels, + pub consensus_proposals_channels: BroadcastTopicChannels>, } impl FlowTestSetup { diff --git a/crates/starknet_integration_tests/src/utils.rs b/crates/starknet_integration_tests/src/utils.rs index fb89e98051..814920f375 100644 --- a/crates/starknet_integration_tests/src/utils.rs +++ b/crates/starknet_integration_tests/src/utils.rs @@ -14,7 +14,7 @@ use mempool_test_utils::starknet_api_test_utils::{ use papyrus_consensus::config::ConsensusConfig; use papyrus_network::network_manager::test_utils::create_network_config_connected_to_broadcast_channels; use papyrus_network::network_manager::BroadcastTopicChannels; -use papyrus_protobuf::consensus::ProposalPart; +use papyrus_protobuf::consensus::{ProposalPart, StreamMessage}; use papyrus_storage::StorageConfig; use reqwest::{Client, Response}; use starknet_api::block::BlockNumber; @@ -49,7 +49,7 @@ pub async fn create_config( chain_info: ChainInfo, rpc_server_addr: SocketAddr, batcher_storage_config: StorageConfig, -) -> (SequencerNodeConfig, RequiredParams, BroadcastTopicChannels) { +) -> (SequencerNodeConfig, RequiredParams, BroadcastTopicChannels>) { let fee_token_addresses = chain_info.fee_token_addresses.clone(); let batcher_config = create_batcher_config(batcher_storage_config, chain_info.clone()); let gateway_config = create_gateway_config(chain_info.clone()).await; @@ -77,11 +77,12 @@ pub async fn create_config( } fn create_consensus_manager_config_and_channels() --> (ConsensusManagerConfig, BroadcastTopicChannels) { +-> (ConsensusManagerConfig, BroadcastTopicChannels>) { let (network_config, broadcast_channels) = create_network_config_connected_to_broadcast_channels( papyrus_network::gossipsub_impl::Topic::new( - starknet_consensus_manager::consensus_manager::NETWORK_TOPIC, + // TODO(guyn): return this to NETWORK_TOPIC once we have integrated streaming. + starknet_consensus_manager::consensus_manager::NETWORK_TOPIC2, ), ); let consensus_manager_config = ConsensusManagerConfig { diff --git a/crates/starknet_integration_tests/tests/end_to_end_flow_test.rs b/crates/starknet_integration_tests/tests/end_to_end_flow_test.rs index 367a97c138..01a9154096 100644 --- a/crates/starknet_integration_tests/tests/end_to_end_flow_test.rs +++ b/crates/starknet_integration_tests/tests/end_to_end_flow_test.rs @@ -3,7 +3,13 @@ use std::collections::HashSet; use futures::StreamExt; use mempool_test_utils::starknet_api_test_utils::MultiAccountTransactionGenerator; use papyrus_network::network_manager::BroadcastTopicChannels; -use papyrus_protobuf::consensus::{ProposalFin, ProposalInit, ProposalPart}; +use papyrus_protobuf::consensus::{ + ProposalFin, + ProposalInit, + ProposalPart, + StreamMessage, + StreamMessageBody, +}; use papyrus_storage::test_utils::CHAIN_ID_FOR_TESTS; use pretty_assertions::assert_eq; use rstest::{fixture, rstest}; @@ -48,7 +54,7 @@ async fn end_to_end(tx_generator: MultiAccountTransactionGenerator) { } async fn listen_to_broadcasted_messages( - consensus_proposals_channels: &mut BroadcastTopicChannels, + consensus_proposals_channels: &mut BroadcastTopicChannels>, expected_batched_tx_hashes: &[TransactionHash], ) { let chain_id = CHAIN_ID_FOR_TESTS.clone(); @@ -67,18 +73,31 @@ async fn listen_to_broadcasted_messages( "0x4597ceedbef644865917bf723184538ef70d43954d63f5b7d8cb9d1bd4c2c32", )), }; - assert_eq!( - broadcasted_messages_receiver.next().await.unwrap().0.unwrap(), - ProposalPart::Init(expected_proposal_init) - ); + + let incoming_message = broadcasted_messages_receiver.next().await.unwrap().0.unwrap(); + let incoming_stream_id = incoming_message.stream_id; + assert_eq!(incoming_message.message_id, 0); + let incoming_message = incoming_message.message; + let StreamMessageBody::Content(ProposalPart::Init(received_proposal_init)) = incoming_message + else { + panic!("Unexpected init: {:?}", incoming_message); + }; + assert_eq!(received_proposal_init, expected_proposal_init); + + let mut proposal_parts_fin = false; + let mut message_body_fin = false; loop { - match broadcasted_messages_receiver.next().await.unwrap().0.unwrap() { - ProposalPart::Init(init) => panic!("Unexpected init: {:?}", init), - ProposalPart::Fin(proposal_fin) => { + let message = broadcasted_messages_receiver.next().await.unwrap().0.unwrap(); + assert_eq!(message.stream_id, incoming_stream_id); + match message.message { + StreamMessageBody::Content(ProposalPart::Init(init)) => { + panic!("Unexpected init: {:?}", init) + } + StreamMessageBody::Content(ProposalPart::Fin(proposal_fin)) => { assert_eq!(proposal_fin, expected_proposal_fin); - break; + proposal_parts_fin = true; } - ProposalPart::Transactions(transactions) => { + StreamMessageBody::Content(ProposalPart::Transactions(transactions)) => { received_tx_hashes.extend( transactions .transactions @@ -86,6 +105,13 @@ async fn listen_to_broadcasted_messages( .map(|tx| tx.calculate_transaction_hash(&chain_id).unwrap()), ); } + // Ignore this, in case it comes out of the network before some of the other messages. + StreamMessageBody::Fin => { + message_body_fin = true; + } + } + if proposal_parts_fin && message_body_fin { + break; } } // Using HashSet to ignore the order of the transactions (broadcast can lead to reordering).