diff --git a/crates/starknet_integration_tests/src/flow_test_setup.rs b/crates/starknet_integration_tests/src/flow_test_setup.rs index c2ae15fca8..86916afea9 100644 --- a/crates/starknet_integration_tests/src/flow_test_setup.rs +++ b/crates/starknet_integration_tests/src/flow_test_setup.rs @@ -10,6 +10,7 @@ use starknet_consensus_manager::config::ConsensusManagerConfig; use starknet_gateway_types::errors::GatewaySpecError; use starknet_http_server::config::HttpServerConfig; use starknet_http_server::test_utils::HttpTestClient; +use starknet_mempool_p2p::config::MempoolP2pConfig; use starknet_sequencer_node::config::node_config::SequencerNodeConfig; use starknet_sequencer_node::servers::run_component_servers; use starknet_sequencer_node::utils::create_node_modules; @@ -24,6 +25,7 @@ use crate::utils::{ create_chain_info, create_config, create_consensus_manager_configs_and_channels, + create_mempool_p2p_configs, }; const SEQUENCER_0: usize = 0; @@ -48,28 +50,33 @@ impl FlowTestSetup { let chain_info = create_chain_info(); let accounts = tx_generator.accounts(); - let (mut consensus_manager_configs, consensus_proposals_channels) = + let (consensus_manager_configs, consensus_proposals_channels) = create_consensus_manager_configs_and_channels(SEQUENCER_INDICES.len()); + let [sequencer_0_consensus_manager_config, sequencer_1_consensus_manager_config]: [ConsensusManagerConfig; + 2] = consensus_manager_configs.try_into().unwrap(); - // Take the first config for every sequencer node, and create nodes one after the other in - // order to make sure the ports are not overlapping. - let sequencer_0_consensus_manager_config = consensus_manager_configs.remove(0); + let mempool_p2p_configs = + create_mempool_p2p_configs(SEQUENCER_INDICES.len(), chain_info.chain_id.clone()); + let [sequencer_0_mempool_p2p_config, sequencer_1_mempool_p2p_config]: [MempoolP2pConfig; + 2] = mempool_p2p_configs.try_into().unwrap(); + + // Create nodes one after the other in order to make sure the ports are not overlapping. let sequencer_0 = SequencerSetup::new( accounts.clone(), SEQUENCER_0, chain_info.clone(), &task_executor, sequencer_0_consensus_manager_config, + sequencer_0_mempool_p2p_config, ) .await; - - let sequencer_1_consensus_manager_config = consensus_manager_configs.remove(0); let sequencer_1 = SequencerSetup::new( accounts, SEQUENCER_1, chain_info, &task_executor, sequencer_1_consensus_manager_config, + sequencer_1_mempool_p2p_config, ) .await; @@ -110,6 +117,7 @@ impl SequencerSetup { chain_info: ChainInfo, task_executor: &TokioExecutor, consensus_manager_config: ConsensusManagerConfig, + mempool_p2p_config: MempoolP2pConfig, ) -> Self { let storage_for_test = StorageTestSetup::new(accounts, &chain_info); @@ -128,6 +136,7 @@ impl SequencerSetup { storage_for_test.batcher_storage_config, storage_for_test.state_sync_storage_config, consensus_manager_config, + mempool_p2p_config, ) .await; diff --git a/crates/starknet_integration_tests/src/integration_test_setup.rs b/crates/starknet_integration_tests/src/integration_test_setup.rs index d2778ffd81..8ea1f40e67 100644 --- a/crates/starknet_integration_tests/src/integration_test_setup.rs +++ b/crates/starknet_integration_tests/src/integration_test_setup.rs @@ -15,6 +15,7 @@ use crate::utils::{ create_chain_info, create_config, create_consensus_manager_configs_and_channels, + create_mempool_p2p_configs, }; const SEQUENCER_INDEX: usize = 0; @@ -59,6 +60,8 @@ impl IntegrationTestSetup { let (mut consensus_manager_configs, _consensus_proposals_channels) = create_consensus_manager_configs_and_channels(SEQUENCER_INDICES.len()); + let mut mempool_p2p_configs = + create_mempool_p2p_configs(SEQUENCER_INDICES.len(), chain_info.chain_id.clone()); // Derive the configuration for the sequencer node. let (config, required_params) = create_config( @@ -68,6 +71,7 @@ impl IntegrationTestSetup { storage_for_test.batcher_storage_config, storage_for_test.state_sync_storage_config, consensus_manager_configs.pop().unwrap(), + mempool_p2p_configs.pop().unwrap(), ) .await; diff --git a/crates/starknet_integration_tests/src/utils.rs b/crates/starknet_integration_tests/src/utils.rs index 0afe826368..3ebd43355e 100644 --- a/crates/starknet_integration_tests/src/utils.rs +++ b/crates/starknet_integration_tests/src/utils.rs @@ -8,7 +8,10 @@ use blockifier::test_utils::{CairoVersion, RunnableCairo1}; use mempool_test_utils::starknet_api_test_utils::{AccountId, MultiAccountTransactionGenerator}; use papyrus_consensus::config::ConsensusConfig; use papyrus_consensus::types::{ValidatorId, DEFAULT_VALIDATOR_ID}; -use papyrus_network::network_manager::test_utils::create_network_configs_connected_to_broadcast_channels; +use papyrus_network::network_manager::test_utils::{ + create_connected_network_configs, + create_network_configs_connected_to_broadcast_channels, +}; use papyrus_network::network_manager::BroadcastTopicChannels; use papyrus_network::NetworkConfig; use papyrus_protobuf::consensus::{ProposalPart, StreamMessage}; @@ -55,6 +58,7 @@ pub async fn create_config( batcher_storage_config: StorageConfig, state_sync_storage_config: StorageConfig, mut consensus_manager_config: ConsensusManagerConfig, + mempool_p2p_config: MempoolP2pConfig, ) -> (SequencerNodeConfig, RequiredParams) { set_validator_id(&mut consensus_manager_config, sequencer_index); let fee_token_addresses = chain_info.fee_token_addresses.clone(); @@ -62,8 +66,6 @@ pub async fn create_config( let gateway_config = create_gateway_config(chain_info.clone()).await; let http_server_config = create_http_server_config().await; let rpc_state_reader_config = test_rpc_state_reader_config(rpc_server_addr); - let mempool_p2p_config = - create_mempool_p2p_config(sequencer_index, chain_info.chain_id.clone()); let monitoring_endpoint_config = create_monitoring_endpoint_config(sequencer_index); let state_sync_config = create_state_sync_config(state_sync_storage_config, sequencer_index); @@ -131,6 +133,16 @@ pub fn test_rpc_state_reader_config(rpc_server_addr: SocketAddr) -> RpcStateRead RpcStateReaderConfig::from_url(format!("http://{rpc_server_addr:?}/rpc/{RPC_SPEC_VERSION}")) } +pub fn create_mempool_p2p_configs(n_mempools: usize, chain_id: ChainId) -> Vec { + create_connected_network_configs(n_mempools) + .into_iter() + .map(|mut network_config| { + network_config.chain_id = chain_id.clone(); + MempoolP2pConfig { network_config, ..Default::default() } + }) + .collect() +} + /// Creates a multi-account transaction generator for integration tests. pub fn create_integration_test_tx_generator() -> MultiAccountTransactionGenerator { let mut tx_generator: MultiAccountTransactionGenerator = @@ -262,16 +274,6 @@ fn set_validator_id(consensus_manager_config: &mut ConsensusManagerConfig, seque .unwrap(); } -fn create_mempool_p2p_config(sequencer_index: usize, chain_id: ChainId) -> MempoolP2pConfig { - let mut config = MempoolP2pConfig::default(); - // When running multiple sequencers on the same machine, we need to make sure their ports are - // different. Use the sequencer_index to differentiate between them. - config.network_config.tcp_port += u16::try_from(sequencer_index).unwrap(); - config.network_config.quic_port += u16::try_from(sequencer_index).unwrap(); - config.network_config.chain_id = chain_id; - config -} - fn create_monitoring_endpoint_config(sequencer_index: usize) -> MonitoringEndpointConfig { let mut config = MonitoringEndpointConfig::default(); config.port += u16::try_from(sequencer_index).unwrap(); diff --git a/crates/starknet_integration_tests/tests/end_to_end_flow_test.rs b/crates/starknet_integration_tests/tests/end_to_end_flow_test.rs index 4e50a8d259..64b1fc2047 100644 --- a/crates/starknet_integration_tests/tests/end_to_end_flow_test.rs +++ b/crates/starknet_integration_tests/tests/end_to_end_flow_test.rs @@ -55,6 +55,8 @@ async fn end_to_end_flow(mut tx_generator: MultiAccountTransactionGenerator) { ]; let sequencers = [&mock_running_system.sequencer_0, &mock_running_system.sequencer_1]; + // We use only the first sequencer's gateway to test that the mempools are syncing. + let sequencer_to_add_txs = *sequencers.first().unwrap(); let mut expected_proposer_iter = sequencers.iter().cycle(); // We start at height 1, so we need to skip the proposer of the initial height. expected_proposer_iter.next().unwrap(); @@ -62,13 +64,19 @@ async fn end_to_end_flow(mut tx_generator: MultiAccountTransactionGenerator) { // Build multiple heights to ensure heights are committed. for (height, expected_content_id) in itertools::zip_eq(heights_to_build, expected_content_ids) { debug!("Starting height {}.", height); - let expected_proposer = expected_proposer_iter.next().unwrap(); // Create and send transactions. let expected_batched_tx_hashes = run_integration_test_scenario(&mut tx_generator, &mut |tx| { - expected_proposer.assert_add_tx_success(tx) + sequencer_to_add_txs.assert_add_tx_success(tx) }) .await; + let expected_validator_id = expected_proposer_iter + .next() + .unwrap() + .config + .consensus_manager_config + .consensus_config + .validator_id; // TODO(Dan, Itay): Consider adding a utility function that waits for something to happen. tokio::time::timeout( LISTEN_TO_BROADCAST_MESSAGES_TIMEOUT, @@ -77,7 +85,7 @@ async fn end_to_end_flow(mut tx_generator: MultiAccountTransactionGenerator) { &expected_batched_tx_hashes, height, expected_content_id, - expected_proposer.config.consensus_manager_config.consensus_config.validator_id, + expected_validator_id, ), ) .await