From b294898369de055939b1eb96e759f4ef37c18a8c Mon Sep 17 00:00:00 2001 From: Dan Brownstein Date: Tue, 5 Nov 2024 00:42:37 +0200 Subject: [PATCH] test: add consensus to e2e flow test --- Cargo.lock | 3 + crates/batcher_types/src/batcher_types.rs | 3 + crates/tests-integration/Cargo.toml | 5 + .../src/bin/run_test_rpc_state_reader.rs | 2 +- .../src/integration_test_setup.rs | 8 +- .../src/integration_test_utils.rs | 49 +++++---- .../tests/end_to_end_test.rs | 104 ++++++++---------- 7 files changed, 90 insertions(+), 84 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5fc6f924233..fd53d4dc97d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10273,11 +10273,14 @@ dependencies = [ "blockifier", "cairo-lang-starknet-classes", "chrono", + "futures", "indexmap 2.6.0", "mempool_test_utils", "papyrus_common", "papyrus_config", "papyrus_consensus", + "papyrus_network", + "papyrus_protobuf", "papyrus_rpc", "papyrus_storage", "pretty_assertions", diff --git a/crates/batcher_types/src/batcher_types.rs b/crates/batcher_types/src/batcher_types.rs index 49b3f3f1af2..ec686bb966f 100644 --- a/crates/batcher_types/src/batcher_types.rs +++ b/crates/batcher_types/src/batcher_types.rs @@ -36,6 +36,8 @@ pub struct BuildProposalInput { pub deadline: chrono::DateTime, pub retrospective_block_hash: Option, // TODO: Should we get the gas price here? + // proposer address + // kzg } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -56,6 +58,7 @@ pub enum GetProposalContent { } #[derive(Clone, Debug, Serialize, Deserialize)] +// Same as propose? pub struct ValidateProposalInput { pub proposal_id: ProposalId, pub deadline: chrono::DateTime, diff --git a/crates/tests-integration/Cargo.toml b/crates/tests-integration/Cargo.toml index 2b96fed2ecc..78225024617 100644 --- a/crates/tests-integration/Cargo.toml +++ b/crates/tests-integration/Cargo.toml @@ -20,6 +20,8 @@ mempool_test_utils.workspace = true papyrus_common.workspace = true papyrus_config.workspace = true papyrus_consensus.workspace = true +papyrus_network = { workspace = true, features = ["testing"] } +papyrus_protobuf.workspace = true papyrus_rpc.workspace = true papyrus_storage = { workspace = true, features = ["testing"] } reqwest.workspace = true @@ -42,6 +44,9 @@ tokio.workspace = true tracing.workspace = true [dev-dependencies] +futures.workspace = true +papyrus_network.workspace = true +papyrus_protobuf.workspace = true pretty_assertions.workspace = true rstest.workspace = true starknet_sequencer_infra.workspace = true diff --git a/crates/tests-integration/src/bin/run_test_rpc_state_reader.rs b/crates/tests-integration/src/bin/run_test_rpc_state_reader.rs index 9afbc08eff4..2ab413b3bab 100644 --- a/crates/tests-integration/src/bin/run_test_rpc_state_reader.rs +++ b/crates/tests-integration/src/bin/run_test_rpc_state_reader.rs @@ -24,7 +24,7 @@ async fn main() -> anyhow::Result<()> { .await; // Derive the configuration for the sequencer node. - let (config, required_params) = + let (config, required_params, _) = create_config(rpc_server_addr, storage_for_test.batcher_storage_config).await; // Note: the batcher storage file handle is passed as a reference to maintain its ownership in diff --git a/crates/tests-integration/src/integration_test_setup.rs b/crates/tests-integration/src/integration_test_setup.rs index 28eb4e1af61..5a76df6c38e 100644 --- a/crates/tests-integration/src/integration_test_setup.rs +++ b/crates/tests-integration/src/integration_test_setup.rs @@ -1,6 +1,8 @@ use std::net::SocketAddr; use mempool_test_utils::starknet_api_test_utils::MultiAccountTransactionGenerator; +use papyrus_network::network_manager::BroadcastTopicChannels; +use papyrus_protobuf::consensus::ProposalPart; use starknet_api::rpc_transaction::RpcTransaction; use starknet_api::transaction::TransactionHash; use starknet_batcher_types::communication::SharedBatcherClient; @@ -32,6 +34,9 @@ pub struct IntegrationTestSetup { // Handle of the sequencer node. pub sequencer_node_handle: JoinHandle>, + + // Channels for consensus proposals, used for asserting the right transactions are proposed. + pub consensus_proposals_channels: BroadcastTopicChannels, } impl IntegrationTestSetup { @@ -53,7 +58,7 @@ impl IntegrationTestSetup { .await; // Derive the configuration for the mempool node. - let (config, _required_params) = + let (config, _required_params, consensus_proposals_channels) = create_config(rpc_server_addr, storage_for_test.batcher_storage_config).await; let (clients, servers) = create_node_modules(&config); @@ -77,6 +82,7 @@ impl IntegrationTestSetup { batcher_client: clients.get_batcher_client().unwrap(), rpc_storage_file_handle: storage_for_test.rpc_storage_handle, sequencer_node_handle, + consensus_proposals_channels, } } diff --git a/crates/tests-integration/src/integration_test_utils.rs b/crates/tests-integration/src/integration_test_utils.rs index 4740fb91bd1..5be8a5baf86 100644 --- a/crates/tests-integration/src/integration_test_utils.rs +++ b/crates/tests-integration/src/integration_test_utils.rs @@ -1,5 +1,6 @@ use std::future::Future; use std::net::SocketAddr; +use std::time::Duration; use axum::body::Body; use blockifier::context::ChainInfo; @@ -11,6 +12,9 @@ use mempool_test_utils::starknet_api_test_utils::{ MultiAccountTransactionGenerator, }; use papyrus_consensus::config::ConsensusConfig; +use papyrus_network::network_manager::test_utils::create_network_config_connected_to_broadcast_channels; +use papyrus_network::network_manager::BroadcastTopicChannels; +use papyrus_protobuf::consensus::ProposalPart; use papyrus_storage::StorageConfig; use reqwest::{Client, Response}; use starknet_api::block::BlockNumber; @@ -28,29 +32,14 @@ use starknet_gateway::config::{ }; use starknet_gateway_types::errors::GatewaySpecError; use starknet_http_server::config::HttpServerConfig; -use starknet_sequencer_node::config::component_config::ComponentConfig; use starknet_sequencer_node::config::test_utils::RequiredParams; -use starknet_sequencer_node::config::{ - ComponentExecutionConfig, - ComponentExecutionMode, - SequencerNodeConfig, -}; +use starknet_sequencer_node::config::SequencerNodeConfig; use tokio::net::TcpListener; pub async fn create_config( rpc_server_addr: SocketAddr, batcher_storage_config: StorageConfig, -) -> (SequencerNodeConfig, RequiredParams) { - // TODO(Arni/ Matan): Enable the consensus in the end to end test. - let components = ComponentConfig { - consensus_manager: ComponentExecutionConfig { - execution_mode: ComponentExecutionMode::Disabled, - local_server_config: None, - ..Default::default() - }, - ..Default::default() - }; - +) -> (SequencerNodeConfig, RequiredParams, BroadcastTopicChannels) { let chain_id = batcher_storage_config.db_config.chain_id.clone(); // TODO(Tsabary): create chain_info in setup, and pass relevant values throughout. let mut chain_info = ChainInfo::create_for_testing(); @@ -60,12 +49,10 @@ pub async fn create_config( let gateway_config = create_gateway_config(chain_info).await; let http_server_config = create_http_server_config().await; let rpc_state_reader_config = test_rpc_state_reader_config(rpc_server_addr); - let consensus_manager_config = ConsensusManagerConfig { - consensus_config: ConsensusConfig { start_height: BlockNumber(1), ..Default::default() }, - }; + let (consensus_manager_config, consensus_proposals_channels) = + create_consensus_manager_config_and_channels(); ( SequencerNodeConfig { - components, batcher_config, consensus_manager_config, gateway_config, @@ -78,9 +65,29 @@ pub async fn create_config( eth_fee_token_address: fee_token_addresses.eth_fee_token_address, strk_fee_token_address: fee_token_addresses.strk_fee_token_address, }, + consensus_proposals_channels, ) } +fn create_consensus_manager_config_and_channels() +-> (ConsensusManagerConfig, BroadcastTopicChannels) { + let (network_config, broadcast_channels) = + create_network_config_connected_to_broadcast_channels( + papyrus_network::gossipsub_impl::Topic::new( + starknet_consensus_manager::consensus_manager::NETWORK_TOPIC, + ), + ); + let consensus_manager_config = ConsensusManagerConfig { + consensus_config: ConsensusConfig { + start_height: BlockNumber(1), + consensus_delay: Duration::from_secs(3), + network_config, + ..Default::default() + }, + }; + (consensus_manager_config, broadcast_channels) +} + pub fn test_rpc_state_reader_config(rpc_server_addr: SocketAddr) -> RpcStateReaderConfig { // TODO(Tsabary): get the latest version from the RPC crate. const RPC_SPEC_VERSION: &str = "V0_8"; diff --git a/crates/tests-integration/tests/end_to_end_test.rs b/crates/tests-integration/tests/end_to_end_test.rs index 24b4b540299..7452c2f81af 100644 --- a/crates/tests-integration/tests/end_to_end_test.rs +++ b/crates/tests-integration/tests/end_to_end_test.rs @@ -1,17 +1,11 @@ +use futures::StreamExt; use mempool_test_utils::starknet_api_test_utils::MultiAccountTransactionGenerator; +use papyrus_network::network_manager::BroadcastTopicChannels; +use papyrus_protobuf::consensus::ProposalPart; use pretty_assertions::assert_eq; use rstest::{fixture, rstest}; -use starknet_api::block::BlockNumber; +use starknet_api::core::ChainId; use starknet_api::transaction::TransactionHash; -use starknet_batcher_types::batcher_types::{ - BuildProposalInput, - DecisionReachedInput, - GetProposalContent, - GetProposalContentInput, - ProposalId, - StartHeightInput, -}; -use starknet_batcher_types::communication::SharedBatcherClient; use starknet_integration_tests::integration_test_setup::IntegrationTestSetup; use starknet_integration_tests::integration_test_utils::{ create_integration_test_tx_generator, @@ -25,7 +19,9 @@ fn tx_generator() -> MultiAccountTransactionGenerator { #[rstest] #[tokio::test] -async fn test_end_to_end(tx_generator: MultiAccountTransactionGenerator) { +async fn end_to_end(tx_generator: MultiAccountTransactionGenerator) { + const LISTEN_TO_BROADCAST_MESSAGES_TIMEOUT: std::time::Duration = + std::time::Duration::from_secs(5); // Setup. let mock_running_system = IntegrationTestSetup::new_from_tx_generator(&tx_generator).await; @@ -34,60 +30,46 @@ async fn test_end_to_end(tx_generator: MultiAccountTransactionGenerator) { mock_running_system.assert_add_tx_success(tx) }) .await; - - // Test. - run_consensus_for_end_to_end_test( - &mock_running_system.batcher_client, - &expected_batched_tx_hashes, - ) - .await; + // TODO(Dan, Itay): Consider adding a utility function that waits for something to happen. + let join_handle = tokio::spawn(async move { + tokio::time::timeout( + LISTEN_TO_BROADCAST_MESSAGES_TIMEOUT, + listen_to_broadcasted_messages( + mock_running_system.consensus_proposals_channels, + &expected_batched_tx_hashes, + ), + ) + .await + .expect("listen to broadcasted messages should finish in time"); + }); + join_handle.await.expect("Task should succeed"); } -/// This function should mirror -/// [`run_consensus`](papyrus_consensus::manager::run_consensus). It makes requests -/// from the batcher client and asserts the expected responses were received. -pub async fn run_consensus_for_end_to_end_test( - batcher_client: &SharedBatcherClient, +async fn listen_to_broadcasted_messages( + consensus_proposals_channels: BroadcastTopicChannels, expected_batched_tx_hashes: &[TransactionHash], ) { - // Start height. - // TODO(Arni): Get the current height and retrospective_block_hash from the rpc storage or use - // consensus directly. - let current_height = BlockNumber(1); - batcher_client.start_height(StartHeightInput { height: current_height }).await.unwrap(); - - // Build proposal. - let proposal_id = ProposalId(0); - let retrospective_block_hash = None; - let build_proposal_duaration = chrono::TimeDelta::new(1, 0).unwrap(); - batcher_client - .build_proposal(BuildProposalInput { - proposal_id, - deadline: chrono::Utc::now() + build_proposal_duaration, - retrospective_block_hash, - }) - .await - .unwrap(); - - // Get proposal content. - let mut executed_tx_hashes: Vec = vec![]; - let _proposal_commitment = loop { - let response = batcher_client - .get_proposal_content(GetProposalContentInput { proposal_id }) + // TODO(Dan, Guy): retrieve chain ID. Maybe by modifying IntegrationTestSetup to hold it as a + // member, and instantiate the value using StorageTestSetup. + const CHAIN_ID_NAME: &str = "CHAIN_ID_SUBDIR"; + let chain_id = ChainId::Other(CHAIN_ID_NAME.to_string()); + let mut broadcasted_messages_receiver = + consensus_proposals_channels.broadcasted_messages_receiver; + let mut received_tx_hashes = vec![]; + while received_tx_hashes.len() < expected_batched_tx_hashes.len() { + let (message, _broadcasted_message_metadata) = broadcasted_messages_receiver + .next() .await - .unwrap(); - match response.content { - GetProposalContent::Txs(batched_txs) => { - executed_tx_hashes.append(&mut batched_txs.iter().map(|tx| tx.tx_hash()).collect()); - } - GetProposalContent::Finished(proposal_commitment) => { - break proposal_commitment; - } + .unwrap_or_else(|| panic!("Expected to receive a message from the broadcast topic")); + if let ProposalPart::Transactions(transactions) = message.unwrap() { + received_tx_hashes.append( + &mut transactions + .transactions + .iter() + .map(|tx| tx.calculate_transaction_hash(&chain_id).unwrap()) + .collect(), + ); } - }; - - // Decision reached. - batcher_client.decision_reached(DecisionReachedInput { proposal_id }).await.unwrap(); - - assert_eq!(expected_batched_tx_hashes, executed_tx_hashes); + } + assert_eq!(received_tx_hashes, expected_batched_tx_hashes); }