diff --git a/crates/starknet_integration_tests/tests/mempool_p2p_flow_test.rs b/crates/starknet_integration_tests/tests/mempool_p2p_flow_test.rs index 29d24535150..5de2a239a00 100644 --- a/crates/starknet_integration_tests/tests/mempool_p2p_flow_test.rs +++ b/crates/starknet_integration_tests/tests/mempool_p2p_flow_test.rs @@ -5,36 +5,29 @@ use std::net::SocketAddr; use futures::StreamExt; use mempool_test_utils::starknet_api_test_utils::MultiAccountTransactionGenerator; use papyrus_network::gossipsub_impl::Topic; -use papyrus_network::network_manager::test_utils::create_network_configs_connected_to_broadcast_channels; +use papyrus_network::network_manager::BroadcastTopicChannels; use papyrus_network::network_manager::BroadcastTopicClientTrait; +use papyrus_network::network_manager::test_utils::create_network_configs_connected_to_broadcast_channels; use papyrus_protobuf::mempool::RpcTransactionWrapper; use rstest::{fixture, rstest}; use starknet_api::executable_transaction::AccountTransaction; use starknet_api::rpc_transaction::{ - RpcDeployAccountTransaction, - RpcInvokeTransaction, - RpcTransaction, + RpcDeployAccountTransaction, RpcInvokeTransaction, RpcTransaction, }; use starknet_api::transaction::TransactionHash; use starknet_http_server::config::HttpServerConfig; use starknet_http_server::test_utils::HttpTestClient; -use starknet_integration_tests::state_reader::{spawn_test_rpc_state_reader, StorageTestSetup}; +use starknet_integration_tests::state_reader::{StorageTestSetup, spawn_test_rpc_state_reader}; use starknet_integration_tests::utils::{ - create_batcher_config, - create_chain_info, - create_gateway_config, - create_http_server_config, - create_integration_test_tx_generator, - run_integration_test_scenario, + create_batcher_config, create_chain_info, create_gateway_config, create_http_server_config, + create_integration_test_tx_generator, run_integration_test_scenario, test_rpc_state_reader_config, }; -use blockifier::context::ChainInfo; -use starknet_mempool_p2p::config::MempoolP2pConfig; use starknet_mempool_p2p::MEMPOOL_TOPIC; +use starknet_mempool_p2p::config::MempoolP2pConfig; use starknet_sequencer_node::config::component_config::ComponentConfig; use starknet_sequencer_node::config::component_execution_config::{ - ComponentExecutionConfig, - ComponentExecutionMode, + ComponentExecutionConfig, ComponentExecutionMode, }; use starknet_sequencer_node::config::node_config::SequencerNodeConfig; use starknet_sequencer_node::servers::run_component_servers; @@ -47,15 +40,12 @@ fn tx_generator() -> MultiAccountTransactionGenerator { create_integration_test_tx_generator() } -// TODO: remove code duplication with FlowTestSetup -#[rstest] -#[tokio::test] -async fn test_mempool_sends_tx_to_other_peer(tx_generator: MultiAccountTransactionGenerator) { - let handle = Handle::current(); - let task_executor = TokioExecutor::new(handle); - - let chain_info = create_chain_info(); +//TODO: finalize setup +async fn setup( + tx_generator: &MultiAccountTransactionGenerator, +) -> (SequencerNodeConfig, BroadcastTopicChannels) { let accounts = tx_generator.accounts(); + let chain_info = create_chain_info(); let storage_for_test = StorageTestSetup::new(accounts, chain_info.chain_id.clone()); // Spawn a papyrus rpc server for a papyrus storage reader. @@ -85,7 +75,7 @@ async fn test_mempool_sends_tx_to_other_peer(tx_generator: MultiAccountTransacti let gateway_config = create_gateway_config(chain_info).await; let http_server_config = create_http_server_config().await; let rpc_state_reader_config = test_rpc_state_reader_config(rpc_server_addr); - let (mut network_configs, mut broadcast_channels) = + let (mut network_configs, broadcast_channels) = create_network_configs_connected_to_broadcast_channels::( 1, Topic::new(MEMPOOL_TOPIC), @@ -101,7 +91,15 @@ async fn test_mempool_sends_tx_to_other_peer(tx_generator: MultiAccountTransacti mempool_p2p_config, ..SequencerNodeConfig::default() }; + (config, broadcast_channels) +} +#[rstest] +#[tokio::test] +async fn test_mempool_sends_tx_to_other_peer(tx_generator: MultiAccountTransactionGenerator) { + let handle = Handle::current(); + let task_executor = TokioExecutor::new(handle); + let (config, mut broadcast_channels) = setup(&tx_generator).await; let (_clients, servers) = create_node_modules(&config); let HttpServerConfig { ip, port } = config.http_server_config; @@ -136,49 +134,10 @@ async fn test_mempool_sends_tx_to_other_peer(tx_generator: MultiAccountTransacti #[rstest] #[tokio::test] async fn test_mempool_receives_tx_from_other_peer(tx_generator: MultiAccountTransactionGenerator) { + const RECEIVED_TX_POLL_INTERVAL: u64 = 100; // milliseconds between calls to read received txs from the broadcast channel let handle = Handle::current(); let task_executor = TokioExecutor::new(handle); - let accounts = tx_generator.accounts(); - let mut chain_info = ChainInfo::create_for_testing(); - let chain_id = chain_info.chain_id.clone(); - let storage_for_test = StorageTestSetup::new(accounts, chain_id.clone()); - // Spawn a papyrus rpc server for a papyrus storage reader. - let rpc_server_addr = - spawn_test_rpc_state_reader(storage_for_test.rpc_storage_reader, chain_id) - .await; - // Derive the configuration for the mempool node. - let components = ComponentConfig { - consensus_manager: ComponentExecutionConfig { - execution_mode: ComponentExecutionMode::Disabled, - local_server_config: None, - ..Default::default() - }, - batcher: ComponentExecutionConfig { - execution_mode: ComponentExecutionMode::Disabled, - local_server_config: None, - ..Default::default() - }, - ..Default::default() - }; - let batcher_config = - create_batcher_config(storage_for_test.batcher_storage_config, chain_info.clone()); - let gateway_config = create_gateway_config(chain_info).await; - let http_server_config = create_http_server_config().await; - let rpc_state_reader_config = test_rpc_state_reader_config(rpc_server_addr); - let (mut network_config, mut broadcast_channels) = - create_network_configs_connected_to_broadcast_channels::(1, Topic::new( - MEMPOOL_TOPIC, - )); - let mempool_p2p_config = MempoolP2pConfig { network_config: network_config.pop().unwrap(), ..Default::default() }; - let config = SequencerNodeConfig { - components, - batcher_config, - gateway_config, - http_server_config, - rpc_state_reader_config, - mempool_p2p_config, - ..SequencerNodeConfig::default() - }; + let (config, mut broadcast_channels) = setup(&tx_generator).await; let (clients, servers) = create_node_modules(&config); let mempool_client = clients.get_mempool_shared_client().unwrap(); // Build and run the sequencer node. @@ -196,7 +155,7 @@ async fn test_mempool_receives_tx_from_other_peer(tx_generator: MultiAccountTran ready(TransactionHash::default()) // using the default value because we don't use the hash anyways. }) .await; - for tx in expected_txs.iter() { + for tx in &expected_txs { broadcast_channels .broadcast_topic_client .broadcast_message(RpcTransactionWrapper(tx.clone())) @@ -204,15 +163,21 @@ async fn test_mempool_receives_tx_from_other_peer(tx_generator: MultiAccountTran .unwrap(); } - // waiting for the tx to be received (TODO: figure out a better solution) - tokio::time::sleep(std::time::Duration::from_millis(2000)).await; - - let received_tx = mempool_client.get_txs(expected_txs.len()).await.unwrap(); - - // make sure we received all the transactions - assert_eq!(received_tx.len(), expected_txs.len()); + let mut received_txs: Vec = vec![]; + // Polling for as many rounds as needed up to 2 seconds + for _ in 0..(2000 / RECEIVED_TX_POLL_INTERVAL) { + if received_txs.len() == expected_txs.len() { + break; + } + received_txs.append( + &mut mempool_client.get_txs(expected_txs.len() - received_txs.len()).await.unwrap(), + ); + tokio::time::sleep(std::time::Duration::from_millis(RECEIVED_TX_POLL_INTERVAL)).await; + } + assert_eq!(received_txs.len(), expected_txs.len()); - for tx in received_tx.iter() { + for tx in received_txs { + // TODO: change mempool to store RpcTransaction let converted_tx: RpcTransaction = match tx { AccountTransaction::Declare(_declare_tx) => { panic!("No implementation for converting DeclareTransaction to an RpcTransaction")