Skip to content

Commit

Permalink
test(starknet_mempool_p2p): fix CR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
AlonLStarkWare committed Dec 2, 2024
1 parent 4152629 commit 466f8b6
Showing 1 changed file with 39 additions and 74 deletions.
113 changes: 39 additions & 74 deletions crates/starknet_integration_tests/tests/mempool_p2p_flow_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,29 @@ use std::net::SocketAddr;
use futures::StreamExt;
use mempool_test_utils::starknet_api_test_utils::MultiAccountTransactionGenerator;
use papyrus_network::gossipsub_impl::Topic;
use papyrus_network::network_manager::test_utils::create_network_configs_connected_to_broadcast_channels;
use papyrus_network::network_manager::BroadcastTopicChannels;
use papyrus_network::network_manager::BroadcastTopicClientTrait;
use papyrus_network::network_manager::test_utils::create_network_configs_connected_to_broadcast_channels;
use papyrus_protobuf::mempool::RpcTransactionWrapper;
use rstest::{fixture, rstest};
use starknet_api::executable_transaction::AccountTransaction;
use starknet_api::rpc_transaction::{
RpcDeployAccountTransaction,
RpcInvokeTransaction,
RpcTransaction,
RpcDeployAccountTransaction, RpcInvokeTransaction, RpcTransaction,
};
use starknet_api::transaction::TransactionHash;
use starknet_http_server::config::HttpServerConfig;
use starknet_http_server::test_utils::HttpTestClient;
use starknet_integration_tests::state_reader::{spawn_test_rpc_state_reader, StorageTestSetup};
use starknet_integration_tests::state_reader::{StorageTestSetup, spawn_test_rpc_state_reader};
use starknet_integration_tests::utils::{
create_batcher_config,
create_chain_info,
create_gateway_config,
create_http_server_config,
create_integration_test_tx_generator,
run_integration_test_scenario,
create_batcher_config, create_chain_info, create_gateway_config, create_http_server_config,
create_integration_test_tx_generator, run_integration_test_scenario,
test_rpc_state_reader_config,
};
use blockifier::context::ChainInfo;
use starknet_mempool_p2p::config::MempoolP2pConfig;
use starknet_mempool_p2p::MEMPOOL_TOPIC;
use starknet_mempool_p2p::config::MempoolP2pConfig;
use starknet_sequencer_node::config::component_config::ComponentConfig;
use starknet_sequencer_node::config::component_execution_config::{
ComponentExecutionConfig,
ComponentExecutionMode,
ComponentExecutionConfig, ComponentExecutionMode,
};
use starknet_sequencer_node::config::node_config::SequencerNodeConfig;
use starknet_sequencer_node::servers::run_component_servers;
Expand All @@ -47,15 +40,12 @@ fn tx_generator() -> MultiAccountTransactionGenerator {
create_integration_test_tx_generator()
}

// TODO: remove code duplication with FlowTestSetup
#[rstest]
#[tokio::test]
async fn test_mempool_sends_tx_to_other_peer(tx_generator: MultiAccountTransactionGenerator) {
let handle = Handle::current();
let task_executor = TokioExecutor::new(handle);

let chain_info = create_chain_info();
//TODO: finalize setup
async fn setup(
tx_generator: &MultiAccountTransactionGenerator,
) -> (SequencerNodeConfig, BroadcastTopicChannels<RpcTransactionWrapper>) {
let accounts = tx_generator.accounts();
let chain_info = create_chain_info();
let storage_for_test = StorageTestSetup::new(accounts, chain_info.chain_id.clone());

// Spawn a papyrus rpc server for a papyrus storage reader.
Expand Down Expand Up @@ -85,7 +75,7 @@ async fn test_mempool_sends_tx_to_other_peer(tx_generator: MultiAccountTransacti
let gateway_config = create_gateway_config(chain_info).await;
let http_server_config = create_http_server_config().await;
let rpc_state_reader_config = test_rpc_state_reader_config(rpc_server_addr);
let (mut network_configs, mut broadcast_channels) =
let (mut network_configs, broadcast_channels) =
create_network_configs_connected_to_broadcast_channels::<RpcTransactionWrapper>(
1,
Topic::new(MEMPOOL_TOPIC),
Expand All @@ -101,7 +91,15 @@ async fn test_mempool_sends_tx_to_other_peer(tx_generator: MultiAccountTransacti
mempool_p2p_config,
..SequencerNodeConfig::default()
};
(config, broadcast_channels)
}

#[rstest]
#[tokio::test]
async fn test_mempool_sends_tx_to_other_peer(tx_generator: MultiAccountTransactionGenerator) {
let handle = Handle::current();
let task_executor = TokioExecutor::new(handle);
let (config, mut broadcast_channels) = setup(&tx_generator).await;
let (_clients, servers) = create_node_modules(&config);

let HttpServerConfig { ip, port } = config.http_server_config;
Expand Down Expand Up @@ -136,49 +134,10 @@ async fn test_mempool_sends_tx_to_other_peer(tx_generator: MultiAccountTransacti
#[rstest]
#[tokio::test]
async fn test_mempool_receives_tx_from_other_peer(tx_generator: MultiAccountTransactionGenerator) {
const RECEIVED_TX_POLL_INTERVAL: u64 = 100; // milliseconds between calls to read received txs from the broadcast channel
let handle = Handle::current();
let task_executor = TokioExecutor::new(handle);
let accounts = tx_generator.accounts();
let mut chain_info = ChainInfo::create_for_testing();
let chain_id = chain_info.chain_id.clone();
let storage_for_test = StorageTestSetup::new(accounts, chain_id.clone());
// Spawn a papyrus rpc server for a papyrus storage reader.
let rpc_server_addr =
spawn_test_rpc_state_reader(storage_for_test.rpc_storage_reader, chain_id)
.await;
// Derive the configuration for the mempool node.
let components = ComponentConfig {
consensus_manager: ComponentExecutionConfig {
execution_mode: ComponentExecutionMode::Disabled,
local_server_config: None,
..Default::default()
},
batcher: ComponentExecutionConfig {
execution_mode: ComponentExecutionMode::Disabled,
local_server_config: None,
..Default::default()
},
..Default::default()
};
let batcher_config =
create_batcher_config(storage_for_test.batcher_storage_config, chain_info.clone());
let gateway_config = create_gateway_config(chain_info).await;
let http_server_config = create_http_server_config().await;
let rpc_state_reader_config = test_rpc_state_reader_config(rpc_server_addr);
let (mut network_config, mut broadcast_channels) =
create_network_configs_connected_to_broadcast_channels::<RpcTransactionWrapper>(1, Topic::new(
MEMPOOL_TOPIC,
));
let mempool_p2p_config = MempoolP2pConfig { network_config: network_config.pop().unwrap(), ..Default::default() };
let config = SequencerNodeConfig {
components,
batcher_config,
gateway_config,
http_server_config,
rpc_state_reader_config,
mempool_p2p_config,
..SequencerNodeConfig::default()
};
let (config, mut broadcast_channels) = setup(&tx_generator).await;
let (clients, servers) = create_node_modules(&config);
let mempool_client = clients.get_mempool_shared_client().unwrap();
// Build and run the sequencer node.
Expand All @@ -196,23 +155,29 @@ async fn test_mempool_receives_tx_from_other_peer(tx_generator: MultiAccountTran
ready(TransactionHash::default()) // using the default value because we don't use the hash anyways.
})
.await;
for tx in expected_txs.iter() {
for tx in &expected_txs {
broadcast_channels
.broadcast_topic_client
.broadcast_message(RpcTransactionWrapper(tx.clone()))
.await
.unwrap();
}

// waiting for the tx to be received (TODO: figure out a better solution)
tokio::time::sleep(std::time::Duration::from_millis(2000)).await;

let received_tx = mempool_client.get_txs(expected_txs.len()).await.unwrap();

// make sure we received all the transactions
assert_eq!(received_tx.len(), expected_txs.len());
let mut received_txs: Vec<AccountTransaction> = vec![];
// Polling for as many rounds as needed up to 2 seconds
for _ in 0..(2000 / RECEIVED_TX_POLL_INTERVAL) {
if received_txs.len() == expected_txs.len() {
break;
}
received_txs.append(
&mut mempool_client.get_txs(expected_txs.len() - received_txs.len()).await.unwrap(),
);
tokio::time::sleep(std::time::Duration::from_millis(RECEIVED_TX_POLL_INTERVAL)).await;
}
assert_eq!(received_txs.len(), expected_txs.len());

for tx in received_tx.iter() {
for tx in received_txs {
// TODO: change mempool to store RpcTransaction
let converted_tx: RpcTransaction = match tx {
AccountTransaction::Declare(_declare_tx) => {
panic!("No implementation for converting DeclareTransaction to an RpcTransaction")
Expand Down

0 comments on commit 466f8b6

Please sign in to comment.