diff --git a/crates/starknet_integration_tests/src/flow_test_setup.rs b/crates/starknet_integration_tests/src/flow_test_setup.rs index f1d6ac6d0f9..ead4014050f 100644 --- a/crates/starknet_integration_tests/src/flow_test_setup.rs +++ b/crates/starknet_integration_tests/src/flow_test_setup.rs @@ -10,6 +10,7 @@ use starknet_consensus_manager::config::ConsensusManagerConfig; use starknet_gateway_types::errors::GatewaySpecError; use starknet_http_server::config::HttpServerConfig; use starknet_http_server::test_utils::HttpTestClient; +use starknet_mempool_p2p::config::MempoolP2pConfig; use starknet_sequencer_node::config::node_config::SequencerNodeConfig; use starknet_sequencer_node::servers::run_component_servers; use starknet_sequencer_node::utils::create_node_modules; @@ -24,6 +25,7 @@ use crate::utils::{ create_chain_info, create_config, create_consensus_manager_configs_and_channels, + create_mempool_p2p_configs, }; const SEQUENCER_0: usize = 0; @@ -50,26 +52,31 @@ impl FlowTestSetup { let accounts = tx_generator.accounts(); let (mut consensus_manager_configs, consensus_proposals_channels) = create_consensus_manager_configs_and_channels(SEQUENCER_INDICES.len()); + let mut mempool_p2p_configs = create_mempool_p2p_configs(SEQUENCER_INDICES.len()); // Take the first config for every sequencer node, and create nodes one after the other in // order to make sure the ports are not overlapping. let sequencer_0_consensus_manager_config = consensus_manager_configs.remove(0); + let sequencer_0_mempool_p2p_config = mempool_p2p_configs.remove(0); let sequencer_0 = SequencerSetup::new( accounts.clone(), SEQUENCER_0, chain_info.clone(), &task_executor, sequencer_0_consensus_manager_config, + sequencer_0_mempool_p2p_config, ) .await; let sequencer_1_consensus_manager_config = consensus_manager_configs.remove(0); + let sequencer_1_mempool_p2p_config = mempool_p2p_configs.remove(0); let sequencer_1 = SequencerSetup::new( accounts, SEQUENCER_1, chain_info, &task_executor, sequencer_1_consensus_manager_config, + sequencer_1_mempool_p2p_config, ) .await; @@ -109,6 +116,7 @@ impl SequencerSetup { chain_info: ChainInfo, task_executor: &TokioExecutor, consensus_manager_config: ConsensusManagerConfig, + mempool_p2p_config: MempoolP2pConfig, ) -> Self { let storage_for_test = StorageTestSetup::new(accounts, &chain_info); @@ -126,6 +134,7 @@ impl SequencerSetup { rpc_server_addr, storage_for_test.batcher_storage_config, consensus_manager_config, + mempool_p2p_config, ) .await; diff --git a/crates/starknet_integration_tests/src/integration_test_setup.rs b/crates/starknet_integration_tests/src/integration_test_setup.rs index c83fb2f6a9a..b010f8b71d4 100644 --- a/crates/starknet_integration_tests/src/integration_test_setup.rs +++ b/crates/starknet_integration_tests/src/integration_test_setup.rs @@ -14,7 +14,7 @@ use crate::state_reader::{spawn_test_rpc_state_reader, StorageTestSetup}; use crate::utils::{ create_chain_info, create_config, - create_consensus_manager_configs_and_channels, + create_consensus_manager_configs_and_channels, create_mempool_p2p_configs, }; const SEQUENCER_INDEX: usize = 0; @@ -55,6 +55,7 @@ impl IntegrationTestSetup { let (mut consensus_manager_configs, _consensus_proposals_channels) = create_consensus_manager_configs_and_channels(SEQUENCER_INDICES.len()); + let mut mempool_p2p_configs = create_mempool_p2p_configs(SEQUENCER_INDICES.len()); // Derive the configuration for the sequencer node. let (config, required_params) = create_config( @@ -63,6 +64,7 @@ impl IntegrationTestSetup { rpc_server_addr, storage_for_test.batcher_storage_config, consensus_manager_configs.pop().unwrap(), + mempool_p2p_configs.pop().unwrap(), ) .await; diff --git a/crates/starknet_integration_tests/src/utils.rs b/crates/starknet_integration_tests/src/utils.rs index 4adda594efc..b4260f4212c 100644 --- a/crates/starknet_integration_tests/src/utils.rs +++ b/crates/starknet_integration_tests/src/utils.rs @@ -8,12 +8,14 @@ use blockifier::test_utils::{CairoVersion, RunnableCairo1}; use mempool_test_utils::starknet_api_test_utils::{AccountId, MultiAccountTransactionGenerator}; use papyrus_consensus::config::ConsensusConfig; use papyrus_consensus::types::ValidatorId; -use papyrus_network::network_manager::test_utils::create_network_configs_connected_to_broadcast_channels; +use papyrus_network::network_manager::test_utils::{ + create_connected_network_configs, + create_network_configs_connected_to_broadcast_channels, +}; use papyrus_network::network_manager::BroadcastTopicChannels; use papyrus_protobuf::consensus::{ProposalPart, StreamMessage}; use papyrus_storage::StorageConfig; use starknet_api::block::BlockNumber; -use starknet_api::core::ChainId; use starknet_api::rpc_transaction::RpcTransaction; use starknet_api::transaction::TransactionHash; use starknet_batcher::block_builder::BlockBuilderConfig; @@ -49,6 +51,7 @@ pub async fn create_config( rpc_server_addr: SocketAddr, batcher_storage_config: StorageConfig, mut consensus_manager_config: ConsensusManagerConfig, + mempool_p2p_config: MempoolP2pConfig, ) -> (SequencerNodeConfig, RequiredParams) { set_validator_id(&mut consensus_manager_config, sequencer_index); let fee_token_addresses = chain_info.fee_token_addresses.clone(); @@ -56,8 +59,6 @@ pub async fn create_config( let gateway_config = create_gateway_config(chain_info.clone()).await; let http_server_config = create_http_server_config().await; let rpc_state_reader_config = test_rpc_state_reader_config(rpc_server_addr); - let mempool_p2p_config = - create_mempool_p2p_config(sequencer_index, chain_info.chain_id.clone()); let monitoring_endpoint_config = create_monitoring_endpoint_config(sequencer_index); ( @@ -116,6 +117,13 @@ pub fn create_consensus_manager_configs_and_channels( (consensus_manager_configs, broadcast_channels) } +pub fn create_mempool_p2p_configs(n_mempools: usize) -> Vec { + create_connected_network_configs(n_mempools) + .into_iter() + .map(|network_config| MempoolP2pConfig { network_config, ..Default::default() }) + .collect() +} + pub fn test_rpc_state_reader_config(rpc_server_addr: SocketAddr) -> RpcStateReaderConfig { // TODO(Tsabary): get the latest version from the RPC crate. const RPC_SPEC_VERSION: &str = "V0_8"; @@ -261,16 +269,6 @@ fn set_validator_id(consensus_manager_config: &mut ConsensusManagerConfig, seque .unwrap(); } -fn create_mempool_p2p_config(sequencer_index: usize, chain_id: ChainId) -> MempoolP2pConfig { - let mut config = MempoolP2pConfig::default(); - // When running multiple sequencers on the same machine, we need to make sure their ports are - // different. Use the sequencer_index to differentiate between them. - config.network_config.tcp_port += u16::try_from(sequencer_index).unwrap(); - config.network_config.quic_port += u16::try_from(sequencer_index).unwrap(); - config.network_config.chain_id = chain_id; - config -} - fn create_monitoring_endpoint_config(sequencer_index: usize) -> MonitoringEndpointConfig { let mut config = MonitoringEndpointConfig::default(); config.port += u16::try_from(sequencer_index).unwrap(); diff --git a/crates/starknet_integration_tests/tests/end_to_end_flow_test.rs b/crates/starknet_integration_tests/tests/end_to_end_flow_test.rs index 090dc1ae654..73bd10c1659 100644 --- a/crates/starknet_integration_tests/tests/end_to_end_flow_test.rs +++ b/crates/starknet_integration_tests/tests/end_to_end_flow_test.rs @@ -55,6 +55,8 @@ async fn end_to_end_flow(mut tx_generator: MultiAccountTransactionGenerator) { ]; let sequencers = [&mock_running_system.sequencer_0, &mock_running_system.sequencer_1]; + // We use only the first sequencer's gateway to test that the mempools are syncing. + let sequencer_to_add_txs = *sequencers.first().unwrap(); let mut expected_proposer_iter = sequencers.iter().cycle(); // We start at height 1, so we need to skip the proposer of the initial height. expected_proposer_iter.next().unwrap(); @@ -62,13 +64,19 @@ async fn end_to_end_flow(mut tx_generator: MultiAccountTransactionGenerator) { // Build multiple heights to ensure heights are committed. for (height, expected_content_id) in itertools::zip_eq(heights_to_build, expected_content_ids) { debug!("Starting height {}.", height); - let expected_proposer = expected_proposer_iter.next().unwrap(); // Create and send transactions. let expected_batched_tx_hashes = run_integration_test_scenario(&mut tx_generator, &mut |tx| { - expected_proposer.assert_add_tx_success(tx) + sequencer_to_add_txs.assert_add_tx_success(tx) }) .await; + let expected_validator_id = expected_proposer_iter + .next() + .unwrap() + .config + .consensus_manager_config + .consensus_config + .validator_id; // TODO(Dan, Itay): Consider adding a utility function that waits for something to happen. tokio::time::timeout( LISTEN_TO_BROADCAST_MESSAGES_TIMEOUT, @@ -77,7 +85,7 @@ async fn end_to_end_flow(mut tx_generator: MultiAccountTransactionGenerator) { &expected_batched_tx_hashes, height, expected_content_id, - expected_proposer.config.consensus_manager_config.consensus_config.validator_id, + expected_validator_id, ), ) .await