diff --git a/crates/papyrus_network/src/network_manager/test_utils.rs b/crates/papyrus_network/src/network_manager/test_utils.rs index e7b59e4695c..be1f65427b8 100644 --- a/crates/papyrus_network/src/network_manager/test_utils.rs +++ b/crates/papyrus_network/src/network_manager/test_utils.rs @@ -188,7 +188,7 @@ where const BUFFER_SIZE: usize = 1000; let mut channels_configs = create_connected_network_configs(n_configs + 1); - let broadcast_channels = channels_configs.pop().unwrap(); + let broadcast_channels = channels_configs.remove(0); let mut channels_network_manager = NetworkManager::new(broadcast_channels, None); let broadcast_channels = diff --git a/crates/starknet_integration_tests/src/flow_test_setup.rs b/crates/starknet_integration_tests/src/flow_test_setup.rs index 0954233cacb..9b440333809 100644 --- a/crates/starknet_integration_tests/src/flow_test_setup.rs +++ b/crates/starknet_integration_tests/src/flow_test_setup.rs @@ -1,51 +1,103 @@ use std::net::SocketAddr; -use mempool_test_utils::starknet_api_test_utils::MultiAccountTransactionGenerator; +use blockifier::context::ChainInfo; +use mempool_test_utils::starknet_api_test_utils::{Contract, MultiAccountTransactionGenerator}; use papyrus_network::network_manager::BroadcastTopicChannels; use papyrus_protobuf::consensus::{ProposalPart, StreamMessage}; use starknet_api::rpc_transaction::RpcTransaction; use starknet_api::transaction::TransactionHash; +use starknet_consensus_manager::config::ConsensusManagerConfig; use starknet_gateway_types::errors::GatewaySpecError; use starknet_http_server::config::HttpServerConfig; use starknet_http_server::test_utils::HttpTestClient; -use starknet_sequencer_infra::trace_util::configure_tracing; use starknet_sequencer_node::servers::run_component_servers; use starknet_sequencer_node::utils::create_node_modules; use starknet_task_executor::tokio_executor::TokioExecutor; use tempfile::TempDir; use tokio::runtime::Handle; use tokio::task::JoinHandle; +use tracing::{debug, instrument}; use crate::state_reader::{spawn_test_rpc_state_reader, StorageTestSetup}; -use crate::utils::{create_chain_info, create_config}; +use crate::utils::{ + create_chain_info, + create_config, + create_consensus_manager_configs_and_channels, +}; + +const PROPOSER_ID: usize = 0; +const SEQUENCER_IDS: [usize; 1] = [PROPOSER_ID]; pub struct FlowTestSetup { + // TODO(Tsabary): Remove this field. pub task_executor: TokioExecutor, - - // Client for adding transactions to the sequencer node. - pub add_tx_http_client: HttpTestClient, - - // Handlers for the storage files, maintained so the files are not deleted. - pub batcher_storage_file_handle: TempDir, - pub rpc_storage_file_handle: TempDir, - - // Handle of the sequencer node. - pub sequencer_node_handle: JoinHandle>, + pub proposer: SequencerTestSetup, // Channels for consensus proposals, used for asserting the right transactions are proposed. pub consensus_proposals_channels: BroadcastTopicChannels>, } impl FlowTestSetup { + #[instrument(skip(tx_generator), level = "debug")] pub async fn new_from_tx_generator(tx_generator: &MultiAccountTransactionGenerator) -> Self { let handle = Handle::current(); let task_executor = TokioExecutor::new(handle); let chain_info = create_chain_info(); - // Configure and start tracing. - configure_tracing(); - let accounts = tx_generator.accounts(); + let (mut consensus_manager_configs, consensus_proposals_channels) = + create_consensus_manager_configs_and_channels(SEQUENCER_IDS.len()); + + // Take the first config for every sequencer node. + let proposer_consensus_manager_config = consensus_manager_configs.remove(0); + let proposer = SequencerTestSetup::new( + accounts.clone(), + PROPOSER_ID, + chain_info.clone(), + &task_executor, + proposer_consensus_manager_config, + ) + .await; + + Self { task_executor, proposer, consensus_proposals_channels } + } + + pub async fn assert_add_tx_success(&self, tx: RpcTransaction) -> TransactionHash { + self.proposer.add_tx_http_client.assert_add_tx_success(tx).await + } + + pub async fn assert_add_tx_error(&self, tx: RpcTransaction) -> GatewaySpecError { + self.proposer.add_tx_http_client.assert_add_tx_error(tx).await + } +} + +pub struct SequencerTestSetup { + /// Used to differentiate between different sequencer nodes. + pub sequencer_id: usize, + + // Client for adding transactions to the sequencer node. + pub add_tx_http_client: HttpTestClient, + + // Handlers for the storage files, maintained so the files are not deleted. + pub batcher_storage_file_handle: TempDir, + pub rpc_storage_file_handle: TempDir, + + // Handle of the sequencer node. + pub sequencer_node_handle: JoinHandle>, +} + +impl SequencerTestSetup { + #[instrument( + skip(accounts, chain_info, task_executor, consensus_manager_config), + level = "debug" + )] + pub async fn new( + accounts: Vec, + sequencer_id: usize, + chain_info: ChainInfo, + task_executor: &TokioExecutor, + consensus_manager_config: ConsensusManagerConfig, + ) -> Self { let storage_for_test = StorageTestSetup::new(accounts, chain_info.chain_id.clone()); // Spawn a papyrus rpc server for a papyrus storage reader. @@ -55,10 +107,18 @@ impl FlowTestSetup { ) .await; + debug!("Rpc server spawned at: {}", rpc_server_addr); + // Derive the configuration for the sequencer node. - let (config, _required_params, consensus_proposals_channels) = - create_config(chain_info, rpc_server_addr, storage_for_test.batcher_storage_config) - .await; + let (config, _required_params) = create_config( + chain_info, + rpc_server_addr, + storage_for_test.batcher_storage_config, + consensus_manager_config, + ) + .await; + + debug!("Sequencer config: {:#?}", config); let (_clients, servers) = create_node_modules(&config); @@ -75,20 +135,11 @@ impl FlowTestSetup { tokio::time::sleep(std::time::Duration::from_millis(100)).await; Self { - task_executor, + sequencer_id, add_tx_http_client, batcher_storage_file_handle: storage_for_test.batcher_storage_handle, rpc_storage_file_handle: storage_for_test.rpc_storage_handle, sequencer_node_handle, - consensus_proposals_channels, } } - - pub async fn assert_add_tx_success(&self, tx: RpcTransaction) -> TransactionHash { - self.add_tx_http_client.assert_add_tx_success(tx).await - } - - pub async fn assert_add_tx_error(&self, tx: RpcTransaction) -> GatewaySpecError { - self.add_tx_http_client.assert_add_tx_error(tx).await - } } diff --git a/crates/starknet_integration_tests/src/integration_test_setup.rs b/crates/starknet_integration_tests/src/integration_test_setup.rs index 770c5be4162..aacc0af30e7 100644 --- a/crates/starknet_integration_tests/src/integration_test_setup.rs +++ b/crates/starknet_integration_tests/src/integration_test_setup.rs @@ -11,7 +11,14 @@ use tempfile::{tempdir, TempDir}; use crate::config_utils::dump_config_file_changes; use crate::state_reader::{spawn_test_rpc_state_reader, StorageTestSetup}; -use crate::utils::{create_chain_info, create_config}; +use crate::utils::{ + create_chain_info, + create_config, + create_consensus_manager_configs_and_channels, +}; + +const SEQUENCER_ID: usize = 0; +const SEQUENCER_IDS: [usize; 1] = [SEQUENCER_ID]; pub struct IntegrationTestSetup { // Client for adding transactions to the sequencer node. @@ -47,10 +54,17 @@ impl IntegrationTestSetup { ) .await; + let (mut consensus_manager_configs, _consensus_proposals_channels) = + create_consensus_manager_configs_and_channels(SEQUENCER_IDS.len()); + // Derive the configuration for the sequencer node. - let (config, required_params, _) = - create_config(chain_info, rpc_server_addr, storage_for_test.batcher_storage_config) - .await; + let (config, required_params) = create_config( + chain_info, + rpc_server_addr, + storage_for_test.batcher_storage_config, + consensus_manager_configs.pop().unwrap(), + ) + .await; let node_config_dir_handle = tempdir().unwrap(); let node_config_path = dump_config_file_changes( diff --git a/crates/starknet_integration_tests/src/utils.rs b/crates/starknet_integration_tests/src/utils.rs index 663b118b3de..0a0e10e5773 100644 --- a/crates/starknet_integration_tests/src/utils.rs +++ b/crates/starknet_integration_tests/src/utils.rs @@ -38,19 +38,20 @@ pub fn create_chain_info() -> ChainInfo { chain_info } +// TODO(yair, Tsabary): Create config presets for tests, then remove all the functions that modify +// the config. pub async fn create_config( chain_info: ChainInfo, rpc_server_addr: SocketAddr, batcher_storage_config: StorageConfig, -) -> (SequencerNodeConfig, RequiredParams, BroadcastTopicChannels>) { + consensus_manager_config: ConsensusManagerConfig, +) -> (SequencerNodeConfig, RequiredParams) { let fee_token_addresses = chain_info.fee_token_addresses.clone(); let batcher_config = create_batcher_config(batcher_storage_config, chain_info.clone()); let gateway_config = create_gateway_config(chain_info.clone()).await; let http_server_config = create_http_server_config().await; let rpc_state_reader_config = test_rpc_state_reader_config(rpc_server_addr); - let (mut consensus_manager_configs, consensus_proposals_channels) = - create_consensus_manager_configs_and_channels(1); - let consensus_manager_config = consensus_manager_configs.pop().unwrap(); + ( SequencerNodeConfig { batcher_config, @@ -66,11 +67,10 @@ pub async fn create_config( strk_fee_token_address: fee_token_addresses.strk_fee_token_address, sequencer_address: ContractAddress::from(1312_u128), // Arbitrary non-zero value. }, - consensus_proposals_channels, ) } -fn create_consensus_manager_configs_and_channels( +pub fn create_consensus_manager_configs_and_channels( n_managers: usize, ) -> (Vec, BroadcastTopicChannels>) { let (network_configs, broadcast_channels) = diff --git a/crates/starknet_integration_tests/tests/end_to_end_flow_test.rs b/crates/starknet_integration_tests/tests/end_to_end_flow_test.rs index 2a808996ce1..1dd40e61b35 100644 --- a/crates/starknet_integration_tests/tests/end_to_end_flow_test.rs +++ b/crates/starknet_integration_tests/tests/end_to_end_flow_test.rs @@ -35,6 +35,8 @@ fn tx_generator() -> MultiAccountTransactionGenerator { #[rstest] #[tokio::test] async fn end_to_end(mut tx_generator: MultiAccountTransactionGenerator) { + starknet_sequencer_infra::trace_util::configure_tracing(); + const LISTEN_TO_BROADCAST_MESSAGES_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5); // Setup.