From 8944f77a1262d3ec91995c1e808fa2bbd3139888 Mon Sep 17 00:00:00 2001 From: Noam Spiegelstein Date: Wed, 27 Nov 2024 17:17:31 +0200 Subject: [PATCH] feat(starknet_state_sync): create and run state sync servers --- .../starknet_integration_tests/src/utils.rs | 11 ++-- .../tests/mempool_p2p_flow_test.rs | 8 +-- crates/starknet_sequencer_node/src/servers.rs | 50 +++++++++++++++++++ crates/starknet_state_sync/src/lib.rs | 9 +++- crates/starknet_state_sync/src/runner/mod.rs | 2 + 5 files changed, 73 insertions(+), 7 deletions(-) diff --git a/crates/starknet_integration_tests/src/utils.rs b/crates/starknet_integration_tests/src/utils.rs index f0318eca1b..0d3c18bb7f 100644 --- a/crates/starknet_integration_tests/src/utils.rs +++ b/crates/starknet_integration_tests/src/utils.rs @@ -65,7 +65,7 @@ pub async fn create_config( let mempool_p2p_config = create_mempool_p2p_config(sequencer_index, chain_info.chain_id.clone()); let monitoring_endpoint_config = create_monitoring_endpoint_config(sequencer_index); - let state_sync_config = create_state_sync_config(state_sync_storage_config); + let state_sync_config = create_state_sync_config(state_sync_storage_config, sequencer_index); ( SequencerNodeConfig { @@ -285,11 +285,16 @@ fn create_monitoring_endpoint_config(sequencer_index: usize) -> MonitoringEndpoi config.port += u16::try_from(sequencer_index).unwrap(); config } -pub fn create_state_sync_config(state_sync_storage_config: StorageConfig) -> StateSyncConfig { + +pub fn create_state_sync_config( + state_sync_storage_config: StorageConfig, + sequencer_index: usize, +) -> StateSyncConfig { StateSyncConfig { storage_config: state_sync_storage_config, network_config: NetworkConfig { - tcp_port: STATE_SYNC_NETWORK_CONFIG_TCP_PORT_FOR_TESTING, + tcp_port: STATE_SYNC_NETWORK_CONFIG_TCP_PORT_FOR_TESTING + + u16::try_from(sequencer_index).unwrap(), ..Default::default() }, ..Default::default() diff --git a/crates/starknet_integration_tests/tests/mempool_p2p_flow_test.rs b/crates/starknet_integration_tests/tests/mempool_p2p_flow_test.rs index 1845722161..93e090543b 100644 --- a/crates/starknet_integration_tests/tests/mempool_p2p_flow_test.rs +++ b/crates/starknet_integration_tests/tests/mempool_p2p_flow_test.rs @@ -25,7 +25,6 @@ use starknet_integration_tests::utils::{ create_gateway_config, create_http_server_config, create_integration_test_tx_generator, - create_state_sync_config, run_integration_test_scenario, test_rpc_state_reader_config, }; @@ -71,6 +70,11 @@ async fn setup( local_server_config: None, ..Default::default() }, + state_sync: ReactiveComponentExecutionConfig { + execution_mode: ReactiveComponentExecutionMode::Disabled, + local_server_config: None, + ..Default::default() + }, ..Default::default() }; @@ -79,7 +83,6 @@ async fn setup( let gateway_config = create_gateway_config(chain_info).await; let http_server_config = create_http_server_config().await; let rpc_state_reader_config = test_rpc_state_reader_config(rpc_server_addr); - let state_sync_config = create_state_sync_config(storage_for_test.state_sync_storage_config); let (mut network_configs, broadcast_channels) = create_network_configs_connected_to_broadcast_channels::( 1, @@ -94,7 +97,6 @@ async fn setup( http_server_config, rpc_state_reader_config, mempool_p2p_config, - state_sync_config, ..SequencerNodeConfig::default() }; (config, broadcast_channels) diff --git a/crates/starknet_sequencer_node/src/servers.rs b/crates/starknet_sequencer_node/src/servers.rs index 690edc2fc8..3c1fcb8822 100644 --- a/crates/starknet_sequencer_node/src/servers.rs +++ b/crates/starknet_sequencer_node/src/servers.rs @@ -20,6 +20,8 @@ use starknet_sequencer_infra::component_server::{ WrapperServer, }; use starknet_sequencer_infra::errors::ComponentServerError; +use starknet_state_sync::runner::StateSyncRunnerServer; +use starknet_state_sync::{LocalStateSyncServer, RemoteStateSyncServer}; use tracing::error; use crate::clients::SequencerNodeClients; @@ -37,6 +39,7 @@ struct LocalServers { pub(crate) gateway: Option>, pub(crate) mempool: Option>, pub(crate) mempool_p2p_propagator: Option>, + pub(crate) state_sync: Option>, } // Component servers that wrap a component without a server. @@ -45,6 +48,7 @@ struct WrapperServers { pub(crate) http_server: Option>, pub(crate) monitoring_endpoint: Option>, pub(crate) mempool_p2p_runner: Option>, + pub(crate) state_sync_runner: Option>, } // Component servers that can run remotely. @@ -54,6 +58,7 @@ pub struct RemoteServers { pub gateway: Option>, pub mempool: Option>, pub mempool_p2p_propagator: Option>, + pub state_sync: Option>, } pub struct SequencerNodeServers { @@ -232,11 +237,18 @@ fn create_local_servers( components.mempool_p2p_propagator, communication.take_mempool_p2p_propagator_rx() ); + let state_sync_server = create_local_server!( + &config.components.state_sync.execution_mode, + components.state_sync, + communication.take_state_sync_rx() + ); + LocalServers { batcher: batcher_server, gateway: gateway_server, mempool: mempool_server, mempool_p2p_propagator: mempool_p2p_propagator_server, + state_sync: state_sync_server, } } @@ -271,11 +283,20 @@ pub fn create_remote_servers( mempool_p2p_propagator_client, config.components.mempool_p2p.remote_server_config ); + + let state_sync_client = clients.get_state_sync_local_client(); + let state_sync_server = create_remote_server!( + &config.components.state_sync.execution_mode, + state_sync_client, + config.components.state_sync.remote_server_config + ); + RemoteServers { batcher: batcher_server, gateway: gateway_server, mempool: mempool_server, mempool_p2p_propagator: mempool_p2p_propagator_server, + state_sync: state_sync_server, } } @@ -301,11 +322,18 @@ fn create_wrapper_servers( &config.components.mempool_p2p.execution_mode.clone().into(), components.mempool_p2p_runner ); + + let state_sync_runner_server = create_wrapper_server!( + &config.components.state_sync.execution_mode.clone().into(), + components.state_sync_runner + ); + WrapperServers { consensus_manager: consensus_manager_server, http_server, monitoring_endpoint: monitoring_endpoint_server, mempool_p2p_runner: mempool_p2p_runner_server, + state_sync_runner: state_sync_runner_server, } } @@ -355,6 +383,13 @@ pub async fn run_component_servers(servers: SequencerNodeServers) -> anyhow::Res // MempoolP2pRunner server. let mempool_p2p_runner_future = get_server_future(servers.wrapper_servers.mempool_p2p_runner); + // StateSync servers. + let local_state_sync_future = get_server_future(servers.local_servers.state_sync); + let remote_state_sync_future = get_server_future(servers.remote_servers.state_sync); + + // StateSyncRunner server. + let state_sync_runner_future = get_server_future(servers.wrapper_servers.state_sync_runner); + // Start servers. let local_batcher_handle = tokio::spawn(local_batcher_future); let remote_batcher_handle = tokio::spawn(remote_batcher_future); @@ -368,6 +403,9 @@ pub async fn run_component_servers(servers: SequencerNodeServers) -> anyhow::Res let local_mempool_p2p_propagator_handle = tokio::spawn(local_mempool_p2p_propagator_future); let remote_mempool_p2p_propagator_handle = tokio::spawn(remote_mempool_p2p_propagator_future); let mempool_p2p_runner_handle = tokio::spawn(mempool_p2p_runner_future); + let local_state_sync_handle = tokio::spawn(local_state_sync_future); + let remote_state_sync_handle = tokio::spawn(remote_state_sync_future); + let state_sync_runner_handle = tokio::spawn(state_sync_runner_future); let result = tokio::select! { res = local_batcher_handle => { @@ -418,6 +456,18 @@ pub async fn run_component_servers(servers: SequencerNodeServers) -> anyhow::Res error!("Mempool P2P Runner Server stopped."); res? } + res = local_state_sync_handle => { + error!("Local State Sync Server stopped."); + res? + } + res = remote_state_sync_handle => { + error!("Remote State Sync Server stopped."); + res? + } + res = state_sync_runner_handle => { + error!("State Sync Runner Server stopped."); + res? + } }; error!("Servers ended with unexpected Ok."); diff --git a/crates/starknet_state_sync/src/lib.rs b/crates/starknet_state_sync/src/lib.rs index 1f86107806..b9ba8fb24d 100644 --- a/crates/starknet_state_sync/src/lib.rs +++ b/crates/starknet_state_sync/src/lib.rs @@ -6,7 +6,8 @@ use papyrus_storage::body::BodyStorageReader; use papyrus_storage::state::StateStorageReader; use papyrus_storage::StorageReader; use starknet_api::block::BlockNumber; -use starknet_sequencer_infra::component_definitions::ComponentRequestHandler; +use starknet_sequencer_infra::component_definitions::{ComponentRequestHandler, ComponentStarter}; +use starknet_sequencer_infra::component_server::{LocalComponentServer, RemoteComponentServer}; use starknet_state_sync_types::communication::{ StateSyncRequest, StateSyncResponse, @@ -57,3 +58,9 @@ impl StateSync { Ok(None) } } + +pub type LocalStateSyncServer = + LocalComponentServer; +pub type RemoteStateSyncServer = RemoteComponentServer; + +impl ComponentStarter for StateSync {} diff --git a/crates/starknet_state_sync/src/runner/mod.rs b/crates/starknet_state_sync/src/runner/mod.rs index defced11d3..d5e8f67149 100644 --- a/crates/starknet_state_sync/src/runner/mod.rs +++ b/crates/starknet_state_sync/src/runner/mod.rs @@ -10,6 +10,7 @@ use papyrus_p2p_sync::server::{P2PSyncServer, P2PSyncServerChannels}; use papyrus_p2p_sync::{Protocol, BUFFER_SIZE}; use papyrus_storage::{open_storage, StorageReader}; use starknet_sequencer_infra::component_definitions::ComponentStarter; +use starknet_sequencer_infra::component_server::WrapperServer; use starknet_sequencer_infra::errors::ComponentError; use crate::config::StateSyncConfig; @@ -95,5 +96,6 @@ impl StateSyncRunner { } } +pub type StateSyncRunnerServer = WrapperServer; // TODO(shahak): fill with a proper version, or allow not specifying the node version. const VERSION_FULL: &str = "";