Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(starknet_mempool_p2p): test tx received from p2p reach mempool #1990

Merged
merged 4 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ async fn test_end_to_end_integration(mut tx_generator: MultiAccountTransactionGe
&mut |rpc_tx| integration_test_setup.add_tx_http_client.assert_add_tx_success(rpc_tx);

const ACCOUNT_ID_0: AccountId = 0;

let n_txs = 50;
let sender_address = tx_generator.account_with_id(ACCOUNT_ID_0).sender_address();
info!("Sending {n_txs} txs.");
Expand Down
100 changes: 91 additions & 9 deletions crates/starknet_integration_tests/tests/mempool_p2p_flow_test.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
use std::collections::HashSet;
use std::future::ready;
use std::net::SocketAddr;

use futures::StreamExt;
use mempool_test_utils::starknet_api_test_utils::MultiAccountTransactionGenerator;
use papyrus_network::gossipsub_impl::Topic;
use papyrus_network::network_manager::test_utils::create_network_configs_connected_to_broadcast_channels;
use papyrus_network::network_manager::{BroadcastTopicChannels, BroadcastTopicClientTrait};
use papyrus_protobuf::mempool::RpcTransactionWrapper;
use rstest::{fixture, rstest};
use starknet_api::rpc_transaction::RpcTransaction;
use starknet_api::executable_transaction::AccountTransaction;
use starknet_api::rpc_transaction::{
RpcDeployAccountTransaction,
RpcInvokeTransaction,
RpcTransaction,
};
use starknet_api::transaction::TransactionHash;
use starknet_http_server::config::HttpServerConfig;
use starknet_http_server::test_utils::HttpTestClient;
use starknet_integration_tests::state_reader::{spawn_test_rpc_state_reader, StorageTestSetup};
Expand Down Expand Up @@ -39,14 +47,11 @@ fn tx_generator() -> MultiAccountTransactionGenerator {
}

// TODO: remove code duplication with FlowTestSetup
#[rstest]
#[tokio::test]
async fn test_mempool_sends_tx_to_other_peer(mut tx_generator: MultiAccountTransactionGenerator) {
let handle = Handle::current();
let task_executor = TokioExecutor::new(handle);

let chain_info = create_chain_info();
async fn setup(
tx_generator: &MultiAccountTransactionGenerator,
) -> (SequencerNodeConfig, BroadcastTopicChannels<RpcTransactionWrapper>) {
let accounts = tx_generator.accounts();
let chain_info = create_chain_info();
let storage_for_test = StorageTestSetup::new(accounts, &chain_info);

// Spawn a papyrus rpc server for a papyrus storage reader.
Expand Down Expand Up @@ -76,7 +81,7 @@ async fn test_mempool_sends_tx_to_other_peer(mut tx_generator: MultiAccountTrans
let gateway_config = create_gateway_config(chain_info).await;
let http_server_config = create_http_server_config().await;
let rpc_state_reader_config = test_rpc_state_reader_config(rpc_server_addr);
let (mut network_configs, mut broadcast_channels) =
let (mut network_configs, broadcast_channels) =
create_network_configs_connected_to_broadcast_channels::<RpcTransactionWrapper>(
1,
Topic::new(MEMPOOL_TOPIC),
Expand All @@ -92,7 +97,15 @@ async fn test_mempool_sends_tx_to_other_peer(mut tx_generator: MultiAccountTrans
mempool_p2p_config,
..SequencerNodeConfig::default()
};
(config, broadcast_channels)
}

#[rstest]
#[tokio::test]
async fn test_mempool_sends_tx_to_other_peer(mut tx_generator: MultiAccountTransactionGenerator) {
let handle = Handle::current();
let task_executor = TokioExecutor::new(handle);
let (config, mut broadcast_channels) = setup(&tx_generator).await;
let (_clients, servers) = create_node_modules(&config);

let HttpServerConfig { ip, port } = config.http_server_config;
Expand Down Expand Up @@ -123,3 +136,72 @@ async fn test_mempool_sends_tx_to_other_peer(mut tx_generator: MultiAccountTrans
expected_txs.remove(&tx);
}
}

#[rstest]
#[tokio::test]
async fn test_mempool_receives_tx_from_other_peer(
mut tx_generator: MultiAccountTransactionGenerator,
) {
const RECEIVED_TX_POLL_INTERVAL: u64 = 100; // milliseconds between calls to read received txs from the broadcast channel
const TXS_RETRIVAL_TIMEOUT: u64 = 2000; // max milliseconds spent polling the received txs before timing out

let handle = Handle::current();
let task_executor = TokioExecutor::new(handle);
let (config, mut broadcast_channels) = setup(&tx_generator).await;
let (clients, servers) = create_node_modules(&config);
let mempool_client = clients.get_mempool_shared_client().unwrap();
// Build and run the sequencer node.
let sequencer_node_future = run_component_servers(servers);
let _sequencer_node_handle = task_executor.spawn_with_handle(sequencer_node_future);
// Wait for server to spin up and for p2p to discover other peer.
// TODO(Gilad): Replace with a persistent Client with a built-in retry to protect against CI
// flakiness.
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;

let mut expected_txs = HashSet::new();

let _tx_hashes = run_integration_test_scenario(&mut tx_generator, &mut |tx: RpcTransaction| {
expected_txs.insert(tx.clone());
ready(TransactionHash::default()) // using the default value because we don't use the hash anyways.
})
.await;
for tx in &expected_txs {
broadcast_channels
.broadcast_topic_client
.broadcast_message(RpcTransactionWrapper(tx.clone()))
.await
.unwrap();
}

let mut received_txs: Vec<AccountTransaction> = vec![];
// Polling for as many rounds as needed up to the set constant
for _ in 0..(TXS_RETRIVAL_TIMEOUT / RECEIVED_TX_POLL_INTERVAL) {
if received_txs.len() == expected_txs.len() {
break;
}
received_txs.append(
// Querying for more txs than we sent verifies there are no extra txs
&mut mempool_client.get_txs(expected_txs.len() - received_txs.len() + 1).await.unwrap(),
);
tokio::time::sleep(std::time::Duration::from_millis(RECEIVED_TX_POLL_INTERVAL)).await;
}
assert_eq!(received_txs.len(), expected_txs.len());

for tx in received_txs {
// TODO: change mempool to store RpcTransaction
let converted_tx: RpcTransaction = match tx {
AccountTransaction::Declare(_declare_tx) => {
panic!("No implementation for converting DeclareTransaction to an RpcTransaction")
}
AccountTransaction::DeployAccount(deploy_account_transaction) => {
RpcTransaction::DeployAccount(RpcDeployAccountTransaction::V3(
deploy_account_transaction.clone().into(),
))
}
AccountTransaction::Invoke(invoke_transaction) => {
RpcTransaction::Invoke(RpcInvokeTransaction::V3(invoke_transaction.clone().into()))
}
};
assert!(expected_txs.contains(&converted_tx));
}
}
Loading