From a0f64c02bd0441f787122f829e13b16ff8e90cdf Mon Sep 17 00:00:00 2001 From: Alon Lukatch Date: Tue, 19 Nov 2024 11:30:44 +0200 Subject: [PATCH] test(mempool_p2p): fix CR comments --- .../tests/mempool_p2p_flow_test.rs | 88 +++++++------------ 1 file changed, 30 insertions(+), 58 deletions(-) diff --git a/crates/starknet_integration_tests/tests/mempool_p2p_flow_test.rs b/crates/starknet_integration_tests/tests/mempool_p2p_flow_test.rs index 29d24535150..9215e7678ad 100644 --- a/crates/starknet_integration_tests/tests/mempool_p2p_flow_test.rs +++ b/crates/starknet_integration_tests/tests/mempool_p2p_flow_test.rs @@ -47,15 +47,12 @@ fn tx_generator() -> MultiAccountTransactionGenerator { create_integration_test_tx_generator() } -// TODO: remove code duplication with FlowTestSetup -#[rstest] -#[tokio::test] -async fn test_mempool_sends_tx_to_other_peer(tx_generator: MultiAccountTransactionGenerator) { - let handle = Handle::current(); - let task_executor = TokioExecutor::new(handle); - - let chain_info = create_chain_info(); +//TODO: finalize setup +async fn setup( + tx_generator: &MultiAccountTransactionGenerator, +) -> (SequencerNodeConfig, BroadcastTopicChannels) { let accounts = tx_generator.accounts(); + let chain_info = create_chain_info(); let storage_for_test = StorageTestSetup::new(accounts, chain_info.chain_id.clone()); // Spawn a papyrus rpc server for a papyrus storage reader. @@ -101,7 +98,15 @@ async fn test_mempool_sends_tx_to_other_peer(tx_generator: MultiAccountTransacti mempool_p2p_config, ..SequencerNodeConfig::default() }; + (config, broadcast_channels) +} +#[rstest] +#[tokio::test] +async fn test_mempool_sends_tx_to_other_peer(tx_generator: MultiAccountTransactionGenerator) { + let handle = Handle::current(); + let task_executor = TokioExecutor::new(handle); + let (config, mut broadcast_channels) = setup(&tx_generator).await; let (_clients, servers) = create_node_modules(&config); let HttpServerConfig { ip, port } = config.http_server_config; @@ -136,49 +141,10 @@ async fn test_mempool_sends_tx_to_other_peer(tx_generator: MultiAccountTransacti #[rstest] #[tokio::test] async fn test_mempool_receives_tx_from_other_peer(tx_generator: MultiAccountTransactionGenerator) { + const RECEIVED_TX_POLL_INTERVAL: u64 = 100; // milliseconds between calls to read received txs from the broadcast channel let handle = Handle::current(); let task_executor = TokioExecutor::new(handle); - let accounts = tx_generator.accounts(); - let mut chain_info = ChainInfo::create_for_testing(); - let chain_id = chain_info.chain_id.clone(); - let storage_for_test = StorageTestSetup::new(accounts, chain_id.clone()); - // Spawn a papyrus rpc server for a papyrus storage reader. - let rpc_server_addr = - spawn_test_rpc_state_reader(storage_for_test.rpc_storage_reader, chain_id) - .await; - // Derive the configuration for the mempool node. - let components = ComponentConfig { - consensus_manager: ComponentExecutionConfig { - execution_mode: ComponentExecutionMode::Disabled, - local_server_config: None, - ..Default::default() - }, - batcher: ComponentExecutionConfig { - execution_mode: ComponentExecutionMode::Disabled, - local_server_config: None, - ..Default::default() - }, - ..Default::default() - }; - let batcher_config = - create_batcher_config(storage_for_test.batcher_storage_config, chain_info.clone()); - let gateway_config = create_gateway_config(chain_info).await; - let http_server_config = create_http_server_config().await; - let rpc_state_reader_config = test_rpc_state_reader_config(rpc_server_addr); - let (mut network_config, mut broadcast_channels) = - create_network_configs_connected_to_broadcast_channels::(1, Topic::new( - MEMPOOL_TOPIC, - )); - let mempool_p2p_config = MempoolP2pConfig { network_config: network_config.pop().unwrap(), ..Default::default() }; - let config = SequencerNodeConfig { - components, - batcher_config, - gateway_config, - http_server_config, - rpc_state_reader_config, - mempool_p2p_config, - ..SequencerNodeConfig::default() - }; + let (config, mut broadcast_channels) = setup(&tx_generator).await; let (clients, servers) = create_node_modules(&config); let mempool_client = clients.get_mempool_shared_client().unwrap(); // Build and run the sequencer node. @@ -196,7 +162,7 @@ async fn test_mempool_receives_tx_from_other_peer(tx_generator: MultiAccountTran ready(TransactionHash::default()) // using the default value because we don't use the hash anyways. }) .await; - for tx in expected_txs.iter() { + for tx in &expected_txs { broadcast_channels .broadcast_topic_client .broadcast_message(RpcTransactionWrapper(tx.clone())) @@ -204,15 +170,21 @@ async fn test_mempool_receives_tx_from_other_peer(tx_generator: MultiAccountTran .unwrap(); } - // waiting for the tx to be received (TODO: figure out a better solution) - tokio::time::sleep(std::time::Duration::from_millis(2000)).await; - - let received_tx = mempool_client.get_txs(expected_txs.len()).await.unwrap(); - - // make sure we received all the transactions - assert_eq!(received_tx.len(), expected_txs.len()); + let mut received_txs: Vec = vec![]; + // Polling for as many rounds as needed up to 2 seconds + for _ in 0..(2000 / RECEIVED_TX_POLL_INTERVAL) { + if received_txs.len() == expected_txs.len() { + break; + } + received_txs.append( + &mut mempool_client.get_txs(expected_txs.len() - received_txs.len()).await.unwrap() + ); + tokio::time::sleep(std::time::Duration::from_millis(RECEIVED_TX_POLL_INTERVAL)).await; + } + assert_eq!(received_txs.len(), expected_txs.len()); - for tx in received_tx.iter() { + for tx in received_txs { + // TODO: change mempool to store RpcTransaction let converted_tx: RpcTransaction = match tx { AccountTransaction::Declare(_declare_tx) => { panic!("No implementation for converting DeclareTransaction to an RpcTransaction")