Skip to content

Commit

Permalink
refactor(starknet_state_sync): refactor state sync to use p2p sync
Browse files Browse the repository at this point in the history
  • Loading branch information
noamsp-starkware committed Dec 9, 2024
1 parent a0dd2fb commit 5e44f1b
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 90 deletions.
6 changes: 2 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 2 additions & 4 deletions crates/starknet_state_sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,12 @@ workspace = true
[dependencies]
async-trait.workspace = true
futures.workspace = true
papyrus_base_layer.workspace = true
papyrus_common.workspace = true
papyrus_config.workspace = true
papyrus_network.workspace = true
papyrus_p2p_sync.workspace = true
papyrus_storage.workspace = true
papyrus_sync.workspace = true
serde.workspace = true
starknet_api = { workspace = true, features = ["testing"] }
starknet_client.workspace = true
starknet_sequencer_infra.workspace = true
starknet_state_sync_types.workspace = true
tokio.workspace = true
Expand Down
19 changes: 8 additions & 11 deletions crates/starknet_state_sync/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,29 @@
use std::collections::BTreeMap;

use papyrus_base_layer::ethereum_base_layer_contract::EthereumBaseLayerConfig;
use papyrus_config::dumping::{append_sub_config_name, SerializeConfig};
use papyrus_config::{ParamPath, SerializedParam};
use papyrus_network::NetworkConfig;
use papyrus_p2p_sync::client::P2PSyncClientConfig;
use papyrus_storage::StorageConfig;
use papyrus_sync::sources::central::CentralSourceConfig;
use papyrus_sync::SyncConfig;
use serde::{Deserialize, Serialize};
use validator::Validate;

#[derive(Debug, Default, Deserialize, Serialize, Clone, PartialEq, Validate)]
pub struct StateSyncConfig {
#[validate]
pub storage_config: StorageConfig,
// TODO(shahak): add validate to SyncConfig, CentralSourceConfig and EthereumBaseLayerConfig
// and use them here.
pub sync_config: SyncConfig,
pub central_config: CentralSourceConfig,
pub base_layer_config: EthereumBaseLayerConfig,
// TODO(shahak): add validate to P2PSyncClientConfig
pub p2p_sync_client_config: P2PSyncClientConfig,
#[validate]
pub network_config: NetworkConfig,
}

impl SerializeConfig for StateSyncConfig {
fn dump(&self) -> BTreeMap<ParamPath, SerializedParam> {
vec![
append_sub_config_name(self.storage_config.dump(), "storage_config"),
append_sub_config_name(self.sync_config.dump(), "sync_config"),
append_sub_config_name(self.central_config.dump(), "central_config"),
append_sub_config_name(self.base_layer_config.dump(), "base_layer_config"),
append_sub_config_name(self.p2p_sync_client_config.dump(), "p2p_sync_client_config"),
append_sub_config_name(self.network_config.dump(), "network_config"),
]
.into_iter()
.flatten()
Expand Down
114 changes: 67 additions & 47 deletions crates/starknet_state_sync/src/runner/mod.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,18 @@
#[cfg(test)]
mod test;

use std::sync::Arc;

use async_trait::async_trait;
use futures::channel::{mpsc, oneshot};
use futures::future::BoxFuture;
use futures::{FutureExt, StreamExt};
use papyrus_common::pending_classes::PendingClasses;
use papyrus_network::network_manager::{self, NetworkError};
use papyrus_p2p_sync::client::{P2PSyncClient, P2PSyncClientChannels, P2PSyncClientError};
use papyrus_p2p_sync::server::{P2PSyncServer, P2PSyncServerChannels};
use papyrus_p2p_sync::{Protocol, BUFFER_SIZE};
use papyrus_storage::body::BodyStorageReader;
use papyrus_storage::state::StateStorageReader;
use papyrus_storage::{open_storage, StorageReader};
use papyrus_sync::sources::base_layer::EthereumBaseLayerSource;
use papyrus_sync::sources::central::CentralSource;
use papyrus_sync::sources::pending::PendingSource;
use papyrus_sync::{
StateSync as PapyrusStateSync,
StateSyncError as PapyrusStateSyncError,
GENESIS_HASH,
};
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_api::felt;
use starknet_client::reader::objects::pending_data::{PendingBlock, PendingBlockOrDeprecated};
use starknet_client::reader::PendingData;
use starknet_api::block::BlockNumber;
use starknet_sequencer_infra::component_definitions::ComponentStarter;
use starknet_sequencer_infra::errors::ComponentError;
use starknet_state_sync_types::communication::{
Expand All @@ -31,7 +21,6 @@ use starknet_state_sync_types::communication::{
StateSyncResult,
};
use starknet_state_sync_types::state_sync_types::SyncBlock;
use tokio::sync::RwLock;

use crate::config::StateSyncConfig;

Expand All @@ -40,15 +29,24 @@ pub struct StateSyncRunner {
request_receiver: mpsc::Receiver<(StateSyncRequest, oneshot::Sender<StateSyncResponse>)>,
#[allow(dead_code)]
storage_reader: StorageReader,
sync_future: BoxFuture<'static, Result<(), PapyrusStateSyncError>>,
network_future: BoxFuture<'static, Result<(), NetworkError>>,
// TODO: change client and server to requester and responder respectively
p2p_sync_client_future: BoxFuture<'static, Result<(), P2PSyncClientError>>,
p2p_sync_server_future: BoxFuture<'static, ()>,
}

#[async_trait]
impl ComponentStarter for StateSyncRunner {
async fn start(&mut self) -> Result<(), ComponentError> {
loop {
tokio::select! {
result = &mut self.sync_future => return result.map_err(|_| ComponentError::InternalComponentError),
result = &mut self.network_future => {
return result.map_err(|_| ComponentError::InternalComponentError);
}
result = &mut self.p2p_sync_client_future => return result.map_err(|_| ComponentError::InternalComponentError),
() = &mut self.p2p_sync_server_future => {
return Err(ComponentError::InternalComponentError);
}
Some((request, sender)) = self.request_receiver.next() => {
let response = match request {
StateSyncRequest::GetBlock(block_number) => {
Expand All @@ -71,41 +69,63 @@ impl StateSyncRunner {
let (storage_reader, storage_writer) =
open_storage(config.storage_config).expect("StateSyncRunner failed opening storage");

let shared_highest_block = Arc::new(RwLock::new(None));
let pending_data = Arc::new(RwLock::new(PendingData {
// The pending data might change later to DeprecatedPendingBlock, depending on the
// response from the feeder gateway.
block: PendingBlockOrDeprecated::Current(PendingBlock {
parent_block_hash: BlockHash(felt!(GENESIS_HASH)),
..Default::default()
}),
..Default::default()
}));
let pending_classes = Arc::new(RwLock::new(PendingClasses::default()));
let mut network_manager = network_manager::NetworkManager::new(
config.network_config,
Some(VERSION_FULL.to_string()),
);

let central_source =
CentralSource::new(config.central_config.clone(), VERSION_FULL, storage_reader.clone())
.expect("Failed creating CentralSource");
// TODO(shahak): add the ability to disable pending sync and disable it here.
let pending_source = PendingSource::new(config.central_config, VERSION_FULL)
.expect("Failed creating PendingSource");
let base_layer_source = EthereumBaseLayerSource::new(config.base_layer_config)
.expect("Failed creating base layer");
let sync = PapyrusStateSync::new(
config.sync_config,
shared_highest_block,
pending_data,
pending_classes,
central_source,
pending_source,
base_layer_source,
let header_client_sender = network_manager
.register_sqmr_protocol_client(Protocol::SignedBlockHeader.into(), BUFFER_SIZE);
let state_diff_client_sender =
network_manager.register_sqmr_protocol_client(Protocol::StateDiff.into(), BUFFER_SIZE);
let transaction_client_sender = network_manager
.register_sqmr_protocol_client(Protocol::Transaction.into(), BUFFER_SIZE);
let class_client_sender =
network_manager.register_sqmr_protocol_client(Protocol::Class.into(), BUFFER_SIZE);
let p2p_sync_client_channels = P2PSyncClientChannels::new(
header_client_sender,
state_diff_client_sender,
transaction_client_sender,
class_client_sender,
);
let p2p_sync_client = P2PSyncClient::new(
config.p2p_sync_client_config,
storage_reader.clone(),
storage_writer,
p2p_sync_client_channels,
);

let header_server_receiver = network_manager
.register_sqmr_protocol_server(Protocol::SignedBlockHeader.into(), BUFFER_SIZE);
let state_diff_server_receiver =
network_manager.register_sqmr_protocol_server(Protocol::StateDiff.into(), BUFFER_SIZE);
let transaction_server_receiver = network_manager
.register_sqmr_protocol_server(Protocol::Transaction.into(), BUFFER_SIZE);
let class_server_receiver =
network_manager.register_sqmr_protocol_server(Protocol::Class.into(), BUFFER_SIZE);
let event_server_receiver =
network_manager.register_sqmr_protocol_server(Protocol::Event.into(), BUFFER_SIZE);
let p2p_sync_server_channels = P2PSyncServerChannels::new(
header_server_receiver,
state_diff_server_receiver,
transaction_server_receiver,
class_server_receiver,
event_server_receiver,
);
let sync_future = sync.run().boxed();
let p2p_sync_server = P2PSyncServer::new(storage_reader.clone(), p2p_sync_server_channels);

let network_future = network_manager.run().boxed();
let p2p_sync_client_future = p2p_sync_client.run().boxed();
let p2p_sync_server_future = p2p_sync_server.run().boxed();

// TODO(shahak): add rpc.
Self { request_receiver, storage_reader, sync_future }
Self {
request_receiver,
storage_reader,
network_future,
p2p_sync_client_future,
p2p_sync_server_future,
}
}

fn get_block(&self, block_number: BlockNumber) -> StateSyncResult<Option<SyncBlock>> {
Expand Down
50 changes: 26 additions & 24 deletions crates/starknet_state_sync/src/runner/test.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,30 @@
use futures::channel::mpsc;
use futures::future::ready;
use futures::FutureExt;
use papyrus_storage::test_utils::get_test_storage;
use papyrus_sync::StateSyncError as PapyrusStateSyncError;
use starknet_sequencer_infra::component_definitions::ComponentStarter;
// TODO: Refactor these to suit the change to state sync now using p2p sync.

use super::StateSyncRunner;
// use futures::channel::mpsc;
// use futures::future::ready;
// use futures::FutureExt;
// use papyrus_storage::test_utils::get_test_storage;
// use papyrus_sync::StateSyncError as PapyrusStateSyncError;
// use starknet_sequencer_infra::component_definitions::ComponentStarter;

const BUFFER_SIZE: usize = 1000;
// use super::StateSyncRunner;

#[test]
fn run_returns_when_sync_future_returns() {
let (_request_sender, request_receiver) = mpsc::channel(BUFFER_SIZE);
let (storage_reader, _storage_writer) = get_test_storage().0;
let sync_future = ready(Ok(())).boxed();
let mut state_sync_runner = StateSyncRunner { request_receiver, storage_reader, sync_future };
state_sync_runner.start().now_or_never().unwrap().unwrap();
}
// const BUFFER_SIZE: usize = 1000;

#[test]
fn run_returns_error_when_sync_future_returns_error() {
let (_request_sender, request_receiver) = mpsc::channel(BUFFER_SIZE);
let (storage_reader, _storage_writer) = get_test_storage().0;
let sync_future = ready(Err(PapyrusStateSyncError::NoProgress)).boxed();
let mut state_sync_runner = StateSyncRunner { request_receiver, storage_reader, sync_future };
state_sync_runner.start().now_or_never().unwrap().unwrap_err();
}
// #[test]
// fn run_returns_when_sync_future_returns() {
// let (_request_sender, request_receiver) = mpsc::channel(BUFFER_SIZE);
// let (storage_reader, _storage_writer) = get_test_storage().0;
// let sync_future = ready(Ok(())).boxed();
// let mut state_sync_runner = StateSyncRunner { request_receiver, storage_reader, sync_future
// }; state_sync_runner.start().now_or_never().unwrap().unwrap();
// }

// #[test]
// fn run_returns_error_when_sync_future_returns_error() {
// let (_request_sender, request_receiver) = mpsc::channel(BUFFER_SIZE);
// let (storage_reader, _storage_writer) = get_test_storage().0;
// let sync_future = ready(Err(PapyrusStateSyncError::NoProgress)).boxed();
// let mut state_sync_runner = StateSyncRunner { request_receiver, storage_reader, sync_future
// }; state_sync_runner.start().now_or_never().unwrap().unwrap_err();
// }

0 comments on commit 5e44f1b

Please sign in to comment.