Skip to content

Commit

Permalink
feat(starknet_state_sync): create and run state sync servers
Browse files Browse the repository at this point in the history
  • Loading branch information
noamsp-starkware committed Dec 15, 2024
1 parent e3ba4f8 commit 8944f77
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 7 deletions.
11 changes: 8 additions & 3 deletions crates/starknet_integration_tests/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ pub async fn create_config(
let mempool_p2p_config =
create_mempool_p2p_config(sequencer_index, chain_info.chain_id.clone());
let monitoring_endpoint_config = create_monitoring_endpoint_config(sequencer_index);
let state_sync_config = create_state_sync_config(state_sync_storage_config);
let state_sync_config = create_state_sync_config(state_sync_storage_config, sequencer_index);

(
SequencerNodeConfig {
Expand Down Expand Up @@ -285,11 +285,16 @@ fn create_monitoring_endpoint_config(sequencer_index: usize) -> MonitoringEndpoi
config.port += u16::try_from(sequencer_index).unwrap();
config
}
pub fn create_state_sync_config(state_sync_storage_config: StorageConfig) -> StateSyncConfig {

pub fn create_state_sync_config(
state_sync_storage_config: StorageConfig,
sequencer_index: usize,
) -> StateSyncConfig {
StateSyncConfig {
storage_config: state_sync_storage_config,
network_config: NetworkConfig {
tcp_port: STATE_SYNC_NETWORK_CONFIG_TCP_PORT_FOR_TESTING,
tcp_port: STATE_SYNC_NETWORK_CONFIG_TCP_PORT_FOR_TESTING
+ u16::try_from(sequencer_index).unwrap(),
..Default::default()
},
..Default::default()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use starknet_integration_tests::utils::{
create_gateway_config,
create_http_server_config,
create_integration_test_tx_generator,
create_state_sync_config,
run_integration_test_scenario,
test_rpc_state_reader_config,
};
Expand Down Expand Up @@ -71,6 +70,11 @@ async fn setup(
local_server_config: None,
..Default::default()
},
state_sync: ReactiveComponentExecutionConfig {
execution_mode: ReactiveComponentExecutionMode::Disabled,
local_server_config: None,
..Default::default()
},
..Default::default()
};

Expand All @@ -79,7 +83,6 @@ async fn setup(
let gateway_config = create_gateway_config(chain_info).await;
let http_server_config = create_http_server_config().await;
let rpc_state_reader_config = test_rpc_state_reader_config(rpc_server_addr);
let state_sync_config = create_state_sync_config(storage_for_test.state_sync_storage_config);
let (mut network_configs, broadcast_channels) =
create_network_configs_connected_to_broadcast_channels::<RpcTransactionWrapper>(
1,
Expand All @@ -94,7 +97,6 @@ async fn setup(
http_server_config,
rpc_state_reader_config,
mempool_p2p_config,
state_sync_config,
..SequencerNodeConfig::default()
};
(config, broadcast_channels)
Expand Down
50 changes: 50 additions & 0 deletions crates/starknet_sequencer_node/src/servers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use starknet_sequencer_infra::component_server::{
WrapperServer,
};
use starknet_sequencer_infra::errors::ComponentServerError;
use starknet_state_sync::runner::StateSyncRunnerServer;
use starknet_state_sync::{LocalStateSyncServer, RemoteStateSyncServer};
use tracing::error;

use crate::clients::SequencerNodeClients;
Expand All @@ -37,6 +39,7 @@ struct LocalServers {
pub(crate) gateway: Option<Box<LocalGatewayServer>>,
pub(crate) mempool: Option<Box<LocalMempoolServer>>,
pub(crate) mempool_p2p_propagator: Option<Box<LocalMempoolP2pPropagatorServer>>,
pub(crate) state_sync: Option<Box<LocalStateSyncServer>>,
}

// Component servers that wrap a component without a server.
Expand All @@ -45,6 +48,7 @@ struct WrapperServers {
pub(crate) http_server: Option<Box<HttpServer>>,
pub(crate) monitoring_endpoint: Option<Box<MonitoringEndpointServer>>,
pub(crate) mempool_p2p_runner: Option<Box<MempoolP2pRunnerServer>>,
pub(crate) state_sync_runner: Option<Box<StateSyncRunnerServer>>,
}

// Component servers that can run remotely.
Expand All @@ -54,6 +58,7 @@ pub struct RemoteServers {
pub gateway: Option<Box<RemoteGatewayServer>>,
pub mempool: Option<Box<RemoteMempoolServer>>,
pub mempool_p2p_propagator: Option<Box<RemoteMempoolP2pPropagatorServer>>,
pub state_sync: Option<Box<RemoteStateSyncServer>>,
}

pub struct SequencerNodeServers {
Expand Down Expand Up @@ -232,11 +237,18 @@ fn create_local_servers(
components.mempool_p2p_propagator,
communication.take_mempool_p2p_propagator_rx()
);
let state_sync_server = create_local_server!(
&config.components.state_sync.execution_mode,
components.state_sync,
communication.take_state_sync_rx()
);

LocalServers {
batcher: batcher_server,
gateway: gateway_server,
mempool: mempool_server,
mempool_p2p_propagator: mempool_p2p_propagator_server,
state_sync: state_sync_server,
}
}

Expand Down Expand Up @@ -271,11 +283,20 @@ pub fn create_remote_servers(
mempool_p2p_propagator_client,
config.components.mempool_p2p.remote_server_config
);

let state_sync_client = clients.get_state_sync_local_client();
let state_sync_server = create_remote_server!(
&config.components.state_sync.execution_mode,
state_sync_client,
config.components.state_sync.remote_server_config
);

RemoteServers {
batcher: batcher_server,
gateway: gateway_server,
mempool: mempool_server,
mempool_p2p_propagator: mempool_p2p_propagator_server,
state_sync: state_sync_server,
}
}

Expand All @@ -301,11 +322,18 @@ fn create_wrapper_servers(
&config.components.mempool_p2p.execution_mode.clone().into(),
components.mempool_p2p_runner
);

let state_sync_runner_server = create_wrapper_server!(
&config.components.state_sync.execution_mode.clone().into(),
components.state_sync_runner
);

WrapperServers {
consensus_manager: consensus_manager_server,
http_server,
monitoring_endpoint: monitoring_endpoint_server,
mempool_p2p_runner: mempool_p2p_runner_server,
state_sync_runner: state_sync_runner_server,
}
}

Expand Down Expand Up @@ -355,6 +383,13 @@ pub async fn run_component_servers(servers: SequencerNodeServers) -> anyhow::Res
// MempoolP2pRunner server.
let mempool_p2p_runner_future = get_server_future(servers.wrapper_servers.mempool_p2p_runner);

// StateSync servers.
let local_state_sync_future = get_server_future(servers.local_servers.state_sync);
let remote_state_sync_future = get_server_future(servers.remote_servers.state_sync);

// StateSyncRunner server.
let state_sync_runner_future = get_server_future(servers.wrapper_servers.state_sync_runner);

// Start servers.
let local_batcher_handle = tokio::spawn(local_batcher_future);
let remote_batcher_handle = tokio::spawn(remote_batcher_future);
Expand All @@ -368,6 +403,9 @@ pub async fn run_component_servers(servers: SequencerNodeServers) -> anyhow::Res
let local_mempool_p2p_propagator_handle = tokio::spawn(local_mempool_p2p_propagator_future);
let remote_mempool_p2p_propagator_handle = tokio::spawn(remote_mempool_p2p_propagator_future);
let mempool_p2p_runner_handle = tokio::spawn(mempool_p2p_runner_future);
let local_state_sync_handle = tokio::spawn(local_state_sync_future);
let remote_state_sync_handle = tokio::spawn(remote_state_sync_future);
let state_sync_runner_handle = tokio::spawn(state_sync_runner_future);

let result = tokio::select! {
res = local_batcher_handle => {
Expand Down Expand Up @@ -418,6 +456,18 @@ pub async fn run_component_servers(servers: SequencerNodeServers) -> anyhow::Res
error!("Mempool P2P Runner Server stopped.");
res?
}
res = local_state_sync_handle => {
error!("Local State Sync Server stopped.");
res?
}
res = remote_state_sync_handle => {
error!("Remote State Sync Server stopped.");
res?
}
res = state_sync_runner_handle => {
error!("State Sync Runner Server stopped.");
res?
}
};
error!("Servers ended with unexpected Ok.");

Expand Down
9 changes: 8 additions & 1 deletion crates/starknet_state_sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use papyrus_storage::body::BodyStorageReader;
use papyrus_storage::state::StateStorageReader;
use papyrus_storage::StorageReader;
use starknet_api::block::BlockNumber;
use starknet_sequencer_infra::component_definitions::ComponentRequestHandler;
use starknet_sequencer_infra::component_definitions::{ComponentRequestHandler, ComponentStarter};
use starknet_sequencer_infra::component_server::{LocalComponentServer, RemoteComponentServer};
use starknet_state_sync_types::communication::{
StateSyncRequest,
StateSyncResponse,
Expand Down Expand Up @@ -57,3 +58,9 @@ impl StateSync {
Ok(None)
}
}

pub type LocalStateSyncServer =
LocalComponentServer<StateSync, StateSyncRequest, StateSyncResponse>;
pub type RemoteStateSyncServer = RemoteComponentServer<StateSyncRequest, StateSyncResponse>;

impl ComponentStarter for StateSync {}
2 changes: 2 additions & 0 deletions crates/starknet_state_sync/src/runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use papyrus_p2p_sync::server::{P2PSyncServer, P2PSyncServerChannels};
use papyrus_p2p_sync::{Protocol, BUFFER_SIZE};
use papyrus_storage::{open_storage, StorageReader};
use starknet_sequencer_infra::component_definitions::ComponentStarter;
use starknet_sequencer_infra::component_server::WrapperServer;
use starknet_sequencer_infra::errors::ComponentError;

use crate::config::StateSyncConfig;
Expand Down Expand Up @@ -95,5 +96,6 @@ impl StateSyncRunner {
}
}

pub type StateSyncRunnerServer = WrapperServer<StateSyncRunner>;
// TODO(shahak): fill with a proper version, or allow not specifying the node version.
const VERSION_FULL: &str = "";

0 comments on commit 8944f77

Please sign in to comment.