Skip to content

Commit

Permalink
test(starknet_integration_tests): test mempool syncing in e2e flow test
Browse files Browse the repository at this point in the history
  • Loading branch information
yair-starkware committed Dec 10, 2024
1 parent 76b3213 commit 1e2aa1b
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 18 deletions.
9 changes: 9 additions & 0 deletions crates/starknet_integration_tests/src/flow_test_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use starknet_consensus_manager::config::ConsensusManagerConfig;
use starknet_gateway_types::errors::GatewaySpecError;
use starknet_http_server::config::HttpServerConfig;
use starknet_http_server::test_utils::HttpTestClient;
use starknet_mempool_p2p::config::MempoolP2pConfig;
use starknet_sequencer_node::config::node_config::SequencerNodeConfig;
use starknet_sequencer_node::servers::run_component_servers;
use starknet_sequencer_node::utils::create_node_modules;
Expand All @@ -24,6 +25,7 @@ use crate::utils::{
create_chain_info,
create_config,
create_consensus_manager_configs_and_channels,
create_mempool_p2p_configs,
};

const SEQUENCER_0: usize = 0;
Expand All @@ -50,26 +52,31 @@ impl FlowTestSetup {
let accounts = tx_generator.accounts();
let (mut consensus_manager_configs, consensus_proposals_channels) =
create_consensus_manager_configs_and_channels(SEQUENCER_INDICES.len());
let mut mempool_p2p_configs = create_mempool_p2p_configs(SEQUENCER_INDICES.len());

// Take the first config for every sequencer node, and create nodes one after the other in
// order to make sure the ports are not overlapping.
let sequencer_0_consensus_manager_config = consensus_manager_configs.remove(0);
let sequencer_0_mempool_p2p_config = mempool_p2p_configs.remove(0);
let sequencer_0 = SequencerSetup::new(
accounts.clone(),
SEQUENCER_0,
chain_info.clone(),
&task_executor,
sequencer_0_consensus_manager_config,
sequencer_0_mempool_p2p_config,
)
.await;

let sequencer_1_consensus_manager_config = consensus_manager_configs.remove(0);
let sequencer_1_mempool_p2p_config = mempool_p2p_configs.remove(0);
let sequencer_1 = SequencerSetup::new(
accounts,
SEQUENCER_1,
chain_info,
&task_executor,
sequencer_1_consensus_manager_config,
sequencer_1_mempool_p2p_config,
)
.await;

Expand Down Expand Up @@ -109,6 +116,7 @@ impl SequencerSetup {
chain_info: ChainInfo,
task_executor: &TokioExecutor,
consensus_manager_config: ConsensusManagerConfig,
mempool_p2p_config: MempoolP2pConfig,
) -> Self {
let storage_for_test = StorageTestSetup::new(accounts, &chain_info);

Expand All @@ -126,6 +134,7 @@ impl SequencerSetup {
rpc_server_addr,
storage_for_test.batcher_storage_config,
consensus_manager_config,
mempool_p2p_config,
)
.await;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::state_reader::{spawn_test_rpc_state_reader, StorageTestSetup};
use crate::utils::{
create_chain_info,
create_config,
create_consensus_manager_configs_and_channels,
create_consensus_manager_configs_and_channels, create_mempool_p2p_configs,
};

const SEQUENCER_INDEX: usize = 0;
Expand Down Expand Up @@ -55,6 +55,7 @@ impl IntegrationTestSetup {

let (mut consensus_manager_configs, _consensus_proposals_channels) =
create_consensus_manager_configs_and_channels(SEQUENCER_INDICES.len());
let mut mempool_p2p_configs = create_mempool_p2p_configs(SEQUENCER_INDICES.len());

// Derive the configuration for the sequencer node.
let (config, required_params) = create_config(
Expand All @@ -63,6 +64,7 @@ impl IntegrationTestSetup {
rpc_server_addr,
storage_for_test.batcher_storage_config,
consensus_manager_configs.pop().unwrap(),
mempool_p2p_configs.pop().unwrap(),
)
.await;

Expand Down
26 changes: 12 additions & 14 deletions crates/starknet_integration_tests/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ use blockifier::test_utils::{CairoVersion, RunnableCairo1};
use mempool_test_utils::starknet_api_test_utils::{AccountId, MultiAccountTransactionGenerator};
use papyrus_consensus::config::ConsensusConfig;
use papyrus_consensus::types::ValidatorId;
use papyrus_network::network_manager::test_utils::create_network_configs_connected_to_broadcast_channels;
use papyrus_network::network_manager::test_utils::{
create_connected_network_configs,
create_network_configs_connected_to_broadcast_channels,
};
use papyrus_network::network_manager::BroadcastTopicChannels;
use papyrus_protobuf::consensus::{ProposalPart, StreamMessage};
use papyrus_storage::StorageConfig;
use starknet_api::block::BlockNumber;
use starknet_api::core::ChainId;
use starknet_api::rpc_transaction::RpcTransaction;
use starknet_api::transaction::TransactionHash;
use starknet_batcher::block_builder::BlockBuilderConfig;
Expand Down Expand Up @@ -49,15 +51,14 @@ pub async fn create_config(
rpc_server_addr: SocketAddr,
batcher_storage_config: StorageConfig,
mut consensus_manager_config: ConsensusManagerConfig,
mempool_p2p_config: MempoolP2pConfig,
) -> (SequencerNodeConfig, RequiredParams) {
set_validator_id(&mut consensus_manager_config, sequencer_index);
let fee_token_addresses = chain_info.fee_token_addresses.clone();
let batcher_config = create_batcher_config(batcher_storage_config, chain_info.clone());
let gateway_config = create_gateway_config(chain_info.clone()).await;
let http_server_config = create_http_server_config().await;
let rpc_state_reader_config = test_rpc_state_reader_config(rpc_server_addr);
let mempool_p2p_config =
create_mempool_p2p_config(sequencer_index, chain_info.chain_id.clone());
let monitoring_endpoint_config = create_monitoring_endpoint_config(sequencer_index);

(
Expand Down Expand Up @@ -116,6 +117,13 @@ pub fn create_consensus_manager_configs_and_channels(
(consensus_manager_configs, broadcast_channels)
}

pub fn create_mempool_p2p_configs(n_mempools: usize) -> Vec<MempoolP2pConfig> {
create_connected_network_configs(n_mempools)
.into_iter()
.map(|network_config| MempoolP2pConfig { network_config, ..Default::default() })
.collect()
}

pub fn test_rpc_state_reader_config(rpc_server_addr: SocketAddr) -> RpcStateReaderConfig {
// TODO(Tsabary): get the latest version from the RPC crate.
const RPC_SPEC_VERSION: &str = "V0_8";
Expand Down Expand Up @@ -261,16 +269,6 @@ fn set_validator_id(consensus_manager_config: &mut ConsensusManagerConfig, seque
.unwrap();
}

fn create_mempool_p2p_config(sequencer_index: usize, chain_id: ChainId) -> MempoolP2pConfig {
let mut config = MempoolP2pConfig::default();
// When running multiple sequencers on the same machine, we need to make sure their ports are
// different. Use the sequencer_index to differentiate between them.
config.network_config.tcp_port += u16::try_from(sequencer_index).unwrap();
config.network_config.quic_port += u16::try_from(sequencer_index).unwrap();
config.network_config.chain_id = chain_id;
config
}

fn create_monitoring_endpoint_config(sequencer_index: usize) -> MonitoringEndpointConfig {
let mut config = MonitoringEndpointConfig::default();
config.port += u16::try_from(sequencer_index).unwrap();
Expand Down
14 changes: 11 additions & 3 deletions crates/starknet_integration_tests/tests/end_to_end_flow_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,20 +55,28 @@ async fn end_to_end_flow(mut tx_generator: MultiAccountTransactionGenerator) {
];

let sequencers = [&mock_running_system.sequencer_0, &mock_running_system.sequencer_1];
// We use only the first sequencer's gateway to test that the mempools are syncing.
let sequencer_to_add_txs = *sequencers.first().unwrap();
let mut expected_proposer_iter = sequencers.iter().cycle();
// We start at height 1, so we need to skip the proposer of the initial height.
expected_proposer_iter.next().unwrap();

// Build multiple heights to ensure heights are committed.
for (height, expected_content_id) in itertools::zip_eq(heights_to_build, expected_content_ids) {
debug!("Starting height {}.", height);
let expected_proposer = expected_proposer_iter.next().unwrap();
// Create and send transactions.
let expected_batched_tx_hashes =
run_integration_test_scenario(&mut tx_generator, &mut |tx| {
expected_proposer.assert_add_tx_success(tx)
sequencer_to_add_txs.assert_add_tx_success(tx)
})
.await;
let expected_validator_id = expected_proposer_iter
.next()
.unwrap()
.config
.consensus_manager_config
.consensus_config
.validator_id;
// TODO(Dan, Itay): Consider adding a utility function that waits for something to happen.
tokio::time::timeout(
LISTEN_TO_BROADCAST_MESSAGES_TIMEOUT,
Expand All @@ -77,7 +85,7 @@ async fn end_to_end_flow(mut tx_generator: MultiAccountTransactionGenerator) {
&expected_batched_tx_hashes,
height,
expected_content_id,
expected_proposer.config.consensus_manager_config.consensus_config.validator_id,
expected_validator_id,
),
)
.await
Expand Down

0 comments on commit 1e2aa1b

Please sign in to comment.